Re: consumer_offsets partition skew and possibly ignored retention

2016-11-02 Thread Chi Hoang
I tried running reassignment on the topic, but that didn't help.  I had to
restart the broker for it to release the file handlers, then manually
delete.

On Fri, Oct 28, 2016 at 6:25 PM, James Brown  wrote:

> I was having this problem with one of my __consumer_offsets partitions; I
> used reassignment to move the large partition onto a different set of
> machines (which forced the cleaner to run through them again) and after the
> new machines finished replicating, the partition was back down from 41GB to
> a nice trim 38MB.
>
> On Fri, Oct 28, 2016 at 1:00 PM, Chi Hoang  wrote:
>
> > Hi,
> > We have a 3-node cluster that is running 0.9.0.1, and recently saw that
> the
> > "__consumer_offsets" topic on one of the nodes seems really skewed with
> > disk usage that looks like:
> >
> > 73G ./__consumer_offsets-10
> > 0   ./__consumer_offsets-7
> > 0   ./__consumer_offsets-4
> > 0   ./__consumer_offsets-1
> > 0   ./__consumer_offsets-49
> > 19G ./__consumer_offsets-46
> > 0   ./__consumer_offsets-43
> > 0   ./__consumer_offsets-40
> >
> >
> >
> > This goes on for all 50 partitions.  Upon inspection, we saw that a lot
> of
> > the log files were old:
> >
> > ll __consumer_offsets-10
> > total 76245192
> > -rw-r--r-- 1 root root 0 Oct  7 20:14 .index
> > -rw-r--r-- 1 root root   901 Oct  7 20:14 .log
> > -rw-r--r-- 1 root root157904 Oct  7 22:15 000907046457.index
> > -rw-r--r-- 1 root root 104855056 Oct  7 22:15 000907046457.log
> > -rw-r--r-- 1 root root157904 Oct  7 22:51 000909543421.index
> > -rw-r--r-- 1 root root 104853568 Oct  7 22:51 000909543421.log
> > -rw-r--r-- 1 root root157904 Oct  7 23:27 000910806717.index
> > -rw-r--r-- 1 root root 104853568 Oct  7 23:27 000910806717.log
> >
> >
> > We are using default parameters as it pertains to offset management, and
> > our config output includes the following entries:
> >
> > log.cleaner.enable = true
> >
> > offsets.retention.minutes = 1440
> >
> >
> > I tried looking through the issues on JIRA but didn't see a reported
> > issue.  Does anyone know what's going on, and how I can fix this?
> >
> > Thanks.
> >
>
>
>
> --
> James Brown
> Engineer
>


consumer_offsets partition skew and possibly ignored retention

2016-10-28 Thread Chi Hoang
Hi,
We have a 3-node cluster that is running 0.9.0.1, and recently saw that the
"__consumer_offsets" topic on one of the nodes seems really skewed with
disk usage that looks like:

73G ./__consumer_offsets-10
0   ./__consumer_offsets-7
0   ./__consumer_offsets-4
0   ./__consumer_offsets-1
0   ./__consumer_offsets-49
19G ./__consumer_offsets-46
0   ./__consumer_offsets-43
0   ./__consumer_offsets-40



This goes on for all 50 partitions.  Upon inspection, we saw that a lot of
the log files were old:

ll __consumer_offsets-10
total 76245192
-rw-r--r-- 1 root root 0 Oct  7 20:14 .index
-rw-r--r-- 1 root root   901 Oct  7 20:14 .log
-rw-r--r-- 1 root root157904 Oct  7 22:15 000907046457.index
-rw-r--r-- 1 root root 104855056 Oct  7 22:15 000907046457.log
-rw-r--r-- 1 root root157904 Oct  7 22:51 000909543421.index
-rw-r--r-- 1 root root 104853568 Oct  7 22:51 000909543421.log
-rw-r--r-- 1 root root157904 Oct  7 23:27 000910806717.index
-rw-r--r-- 1 root root 104853568 Oct  7 23:27 000910806717.log


We are using default parameters as it pertains to offset management, and
our config output includes the following entries:

log.cleaner.enable = true

offsets.retention.minutes = 1440


I tried looking through the issues on JIRA but didn't see a reported
issue.  Does anyone know what's going on, and how I can fix this?

Thanks.


Re: Raid vs individual disks

2015-08-21 Thread Chi Hoang
We are running with a JBOD configuration, and it is not recommended for the
following reasons:
- any volume failure causes an unclean shutdown and requires lengthy
recovery
- data is not distributed consistently across volumes, so you could have
skew within a broker

We are planning to switch to a RAID-10 implementation.

Chi

On Fri, Aug 21, 2015 at 1:59 PM, Todd Palino  wrote:

> At LinkedIn, we are using a RAID-10 of 14 disks. This is using software
> RAID. I recently did some performance testing with RAID 0, 5, and 6. I
> found that 5 and 6 underperformed significantly, possibly due to the parity
> calculations. RAID 0 had a sizable performance gain over 10, and I would
> expect single disks to perform similarly. I didn't test it because it lacks
> some balancing ability that We would need.
>
> -Todd
>
>
> On Friday, August 21, 2015, Prabhjot Bharaj  wrote:
>
> > Hi,
> >
> > I've gone through the details mentioned about Raid and individual disks
> in
> > the ops section of the documentation
> >
> > But, I would like to know what performance boost we can get with
> individual
> > disks.
> > Is anybody using Kafka with multiple disks or all are raid into 1 big
> disk
> > ?
> >
> > Regards,
> > Prabcs
> >
>


Re: Producer does not recognize new brokers

2015-04-13 Thread Chi Hoang
I highly recommend https://github.com/airbnb/kafkat, which will simplify
your partition management tasks.  Use it with
https://github.com/airbnb/kafkat/pull/3 for partition specific reassignment.

Chi

On Mon, Apr 13, 2015 at 4:08 AM, Jan Filipiak 
wrote:

> Hey,
>
> try to not have newlines \n in your jsonfile. I think the parser dies on
> those and then claims the file is empty
>
> Best
> Jan
>
>
>
>
>
> On 13.04.2015 12:06, Ashutosh Kumar wrote:
>
>> Probably you should first try to generate proposed plan using --generate
>> option and then edit that if needed.
>> thanks
>>
>>
>> On Mon, Apr 13, 2015 at 3:12 PM, shadyxu  wrote:
>>
>>  Thanks guys. You are right and then here comes another problems:
>>>
>>> I added new brokers 4, 5 and 6. Now I want to move partitions 3, 4 and
>>> 5(currently on broker 1, 2 and 3) of topic test to these brokers. I wrote
>>> r.json file like this:
>>>
>>> {"partitions":
>>> [{"topic": "test","partition": 3,"replicas": [4]},
>>> {"topic":"test","partition":4,"replicas":[5]},
>>> {"topic":"test","partition":5,"replicas":[6]},],
>>> "version":1}
>>>
>>> and then ran:
>>>
>>>  bin/kafka-reassign-partitions.sh --zookeeper [some-kafka-address]
>>> --reassignment-json-file r.json --execute
>>>
>>> Kafka gave me this error message:
>>>
>>>  kafka.common.AdminCommandFailedException: Partition reassignment
>>> data
>>> file r.json is empty
>>>
>>> I googled, seems Kafka parse the json file but found that no partitions
>>> were needed to be removed. Was my json file not properly configured?
>>>
>>> 2015-04-13 14:00 GMT+08:00 Ashutosh Kumar :
>>>
>>>  I think you need to re balance the cluster.
 something like

 bin/kafka-reassign-partitions.sh --zookeeper localhost:2181
 --topics-to-move-json-file topics-to-move.json --broker-list "5,6"
 --generate


 On Mon, Apr 13, 2015 at 11:22 AM, shadyxu  wrote:

  I added several new brokers to the cluster, there should'v been a
>
 rebalance

> but it seemed that the producer was not aware of the new brokers. Data
>
 kept

> being sent to the old brokers and there were no partitions on the new
> brokers.
>
> I configured the old brokers to the producer and did not restart the
> producer or add the new brokers to the configuration.
>
> What may be the problems?
>
>
>


Re: Kafka elastic no downtime scalability

2015-03-13 Thread Chi Hoang
Hi Stevo,
I won't speak for Joe, but what we do is documented in the link that Joe
provided:
"Adding servers to a Kafka cluster is easy, just assign them a unique
broker id and start up Kafka on your new servers. However these new servers
will not automatically be assigned any data partitions, so unless
partitions are moved to them they won't be doing any work until new topics
are created. So usually when you add machines to your cluster you will want
to migrate some existing data to these machines.

The process of migrating data is manually initiated but fully automated.
Under the covers what happens is that Kafka will add the new server as a
follower of the partition it is migrating and allow it to fully replicate
the existing data in that partition. When the new server has fully
replicated the contents of this partition and joined the in-sync replica
one of the existing replicas will delete their partition's data.

The partition reassignment tool can be used to move partitions across
brokers. An ideal partition distribution would ensure even data load and
partition sizes across all brokers. In 0.8.1, the partition reassignment
tool does not have the capability to automatically study the data
distribution in a Kafka cluster and move partitions around to attain an
even load distribution. As such, the admin has to figure out which topics
or partitions should be moved around."
We use a tool called kafkat (https://github.com/airbnb/kafkat) for
reassignment and other administrative tasks, and have added brokers and
partitions without an problems.  The manual part is that you manually
initiate the commands, but Kafka takes care of the rest without any
interruption to producers and consumers.  I also want to make clear that
kafkat is not necessary but makes it much easier.

Hope this helps clarify your doubts.

Chi

On Fri, Mar 13, 2015 at 4:19 PM, Stevo Slavić  wrote:

> These features are all nice, if one adds new brokers to support additional
> topics, or to move existing partitions or whole topics to new brokers.
> Referenced sentence is in paragraph named scalability. When I read
> "expanded" I was thinking of scaling out, extending parallelization
> capabilities, and parallelism in Kafka is achieved with partitions. So I
> understood that sentence that it is possible to add more partitions to
> existing topics at runtime, with no downtime.
>
> I just found in source that there is API for adding new partitions to
> existing topics (see
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/AdminUtils.scala#L101
> ). Have to try it. I guess it should work during runtime, causing no
> downtime or data loss, or moving data from existing to new partition.
> Producers and consumers will eventually start writing to and reading from
> new partition, and consumers should be able to read previously published
> messages from old partitions, even messages which if they were sent again
> would end up assigned/written to new partition.
>
>
> Kind regards,
> Stevo Slavic.
>
> On Fri, Mar 13, 2015 at 8:27 PM, Joe Stein  wrote:
>
> > https://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
> >
> > ~ Joe Stein
> > - - - - - - - - - - - - - - - - -
> >
> >   http://www.stealth.ly
> > - - - - - - - - - - - - - - - - -
> >
> > On Fri, Mar 13, 2015 at 3:05 PM, sunil kalva 
> wrote:
> >
> > > Joe
> > >
> > > "Well, I know it is semantic but right now it "can" be elastically
> scaled
> > > without down time but you have to integrate into your environment for
> > what
> > > that means it has been that way since 0.8.0 imho"
> > >
> > > here what do you mean "you have to integrate into your environment",
> how
> > do
> > > i achieve elastically scaled cluster seamlessly ?
> > >
> > > SunilKalva
> > >
> > > On Fri, Mar 13, 2015 at 10:27 PM, Joe Stein 
> > wrote:
> > >
> > > > Well, I know it is semantic but right now it "can" be elastically
> > scaled
> > > > without down time but you have to integrate into your environment for
> > > what
> > > > that means it has been that way since 0.8.0 imho.
> > > >
> > > > My point was just another way to-do that out of the box... folks do
> > this
> > > > elastic scailing today with AWS CloudFormation and internal systems
> > they
> > > > built too.
> > > >
> > > > So, it can be done... you just have todo it.
> > > >
> > > > ~ Joe Stein
> > > > - - - - - - - - - - - - - - - - -
> > > >
> > > >   http://www.stealth.ly
> > > > - - - - - - - - - - - - - - - - -
> > > >
> > > > On Fri, Mar 13, 2015 at 12:39 PM, Stevo Slavić 
> > > wrote:
> > > >
> > > > > OK, thanks for heads up.
> > > > >
> > > > > When reading Apache Kafka docs, and reading what Apache Kafka
> "can" I
> > > > > expect it to already be available in latest general availability
> > > release,
> > > > > not what's planned as part of some other project.
> > > > >
> > > > > Kind regards,
> > > > > Stevo Slavic.
> > > > >
> > > > > On Fri, Mar 13, 2015 at 2:32 PM, Joe Stein 
> > > wrote:
>

Re: Trouble with snappy and SimpleConsumer

2014-06-06 Thread Chi Hoang
We are using Kafka 0.8.1.1 (2.9.2) rebuilt with Snappy 1.1.0.1, and don't
have problems with producers or consumers.

Chi


On Thu, Jun 5, 2014 at 9:29 PM, Vinay Gupta 
wrote:

> Hi,
>   I am using
> kafka_2.9.2-0.8.1and
> snappy-java-1.1.0.1.jar
>
>
> I have been able to successfully use gzip with the same library. however
> “snappy” doesn’t work in consumer side.
> Producer is able to send snappy messages to broker though.
> I have made sure that snappy java lib is the same on both consumer and
> producer side.
>
> Is there a problem with 1.1.0.1 snappy for usage with kafka ??? Should I
> switch to an older version??
>
> Anyone else having this problem?? Thanks in advance for any pointers.
>
>
> Thanks
> -Vinay
>
>
> ——Exception ——
>
> topic=events partition=2leaderId=3 beginOffset=2225451 offset=2225451
> checksum=0 time=1402008438921
> java.lang.Exception:
> org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/lang/Object;II)I
> at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
> at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:541)
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:350)
> at
> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
> at
> org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> at java.io.InputStream.read(InputStream.java:82)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply$mcI$sp(Unknown
> Source)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(Unknown
> Source)
> at
> kafka.message.ByteBufferMessageSet$$anonfun$decompress$1.apply(Unknown
> Source)
> at
> scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> at kafka.message.ByteBufferMessageSet$.decompress(Unknown Source)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(Unknown Source)
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(Unknown
> Source)
> at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(Unknown
> Source)
> at kafka.utils.IteratorTemplate.maybeComputeNext(Unknown Source)
> at kafka.utils.IteratorTemplate.hasNext(Unknown Source)
> at
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(Unknown Source)
> :
> :
> :
> :
> Caused by: java.lang.UnsatisfiedLinkError:
> org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/lang/Object;II)I
> ... 31 more
>
>
>


Re: Kafka Queue Depth Metrics

2014-03-31 Thread Chi Hoang
We've used bytes written as in, and bytes read as out.

Chi


On Mon, Mar 31, 2014 at 11:12 AM, Arnaud Lawson wrote:

> Ok thanks Guozhang . So What would the metrics for the in-traffic and
> out-traffic message count be on this page (this page shows all existing jmx
> metrics)?
>
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
>
>
> On Mon, Mar 31, 2014 at 1:01 PM, Guozhang Wang  wrote:
>
> > Hi Arnaud,
> >
> > Currently we do not have metric(s) for the number of messages on the
> kafka
> > broker, though we do have the metrics for the in-traffic and out-traffic
> > message count.
> >
> > One work-around would be the ConsumerOffsetChecker tool, for which it
> shows
> > the last offset of messages on the brokers v.s. the consumed last offsets
> > of the specified consumer on certain topics.
> >
> > Guozhang
> >
> >
> > On Mon, Mar 31, 2014 at 8:36 AM, Arnaud Lawson  > >wrote:
> >
> > > Hi,
> > >
> > > Does anyone know what the JMX metric(s) are for determining the depth
> of
> > > the kafka queue? I basically want to know the amount of messages that
> are
> > > on the queue at a certain point in time. Please let me know if there
> is a
> > > way to find that out. Thanks.
> > >
> > > --
> > >
> > > Arnaud Lawson
> > > ,
> > >
> > > Systems Operations Engineer
> > > *VELOS*
> > > Accelerating Machine Learning.
> > >
> > > 440 9th AVE, 11TH FLOOR, NEW YORK, NY 10001
> > > C: (240) 393 - 6703
> > > F: 646.349.4063
> > > E: arnaud.law...@velos.io
> > > W: www.velos.io
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
>
> Arnaud Lawson
> ,
>
> Systems Operations Engineer
> *VELOS*
> Accelerating Machine Learning.
>
> 440 9th AVE, 11TH FLOOR, NEW YORK, NY 10001
> C: (240) 393 - 6703
> F: 646.349.4063
> E: arnaud.law...@velos.io
> W: www.velos.io
>