Re: partition reassignment stuck

2015-04-21 Thread Wesley Chow
There is only one broker that thinks it's the controller right now.  The
double controller situation happened earlier this morning. Do the other
brokers have to be bounced after the controller situation is fixed? I did
not do that for all brokers.

Wes
 On Apr 21, 2015 8:25 PM, "Jiangjie Qin"  wrote:

>  Yes, should be broker 25 thread 0 from the log.
> This needs to be resolved, you might need to bounce both of the brokers
> who think itself as controller respectively. The new controller should be
> able to continue the partition reassignment.
>
>   From: Wes Chow 
> Reply-To: "users@kafka.apache.org" 
> Date: Tuesday, April 21, 2015 at 1:29 PM
> To: "users@kafka.apache.org" 
> Subject: Re: partition reassignment stuck
>
>
> Quick clarification: you say broker 0, but do you actually mean broker 25?
> 25 one of the replicas for the partition, is currently the one having
> trouble getting into sync, and 28 is the leader for the partition.
>
> Unfortunately, the logs of rotated off so I can't get to what happened
> around then. However there was a time period of a few hours where we had
> two brokers that both believed they were controllers. I'm not sure why I
> didn't think of this before.
>
> ZooKeeper data appears to be inconsistent at the moment.
> /brokers/topics/click_engage says that partition 116's replica set is: [4,
> 7, 25]. /brokers/topics/click_engage/partitions/116/state says the leader
> is 28 and the ISR is [28, 15]. Does this need to be resolved, and if so how?
>
> Thanks,
> Wes
>
>   Jiangjie Qin 
> April 21, 2015 at 2:24 PM
>   This means that the broker 0 thought broker 28 was leader for that
> partition but broker 28 has actually already received StopReplicaRequest
> from controller and stopped serving as a replica for that partition.
> This might happen transiently but broker 0 will be able to find the new
> leader for the partition once it receive LeaderAndIsrRequest from
> controller to update the new leader information. If these messages keep got
> logged for long time then there might be an issue.
> Can you maybe check the timestamp around [2015-04-21 12:15:36,585] on
> broker 28 to see if there is some error log. The error log might not have
> partition info included.
>
>   From: Wes Chow 
> Reply-To: "users@kafka.apache.org" 
> Date: Tuesday, April 21, 2015 at 10:50 AM
> To: "users@kafka.apache.org" 
> Subject: Re: partition reassignment stuck
>
>
> Not for that particular partition, but I am seeing these errors on 28:
>
> kafka.common.NotAssignedReplicaException: Leader 28 failed to record
> follower 25's position 0 for partition [click_engage,116] since the replica
> 25 is not recognized to be one of the assigned r
> eplicas  for partition [click_engage,116]
> at
> kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:231)
> at
> kafka.server.ReplicaManager.recordFollowerPosition(ReplicaManager.scala:432)
> at
> kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:460)
> at
> kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:458)
> at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:176)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
> at
> kafka.server.KafkaApis.maybeUpdatePartitionHw(KafkaApis.scala:458)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:424)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:745)
>
> What does this mean?
>
> Thanks!
> Wes
>
>
> Wes Chow 
> April 21, 2015 at 1:50 PM
>
> Not for that particular partition, but I am seeing these errors on 28:
>
> kafka.common.NotAssignedReplicaException: Leader 28 failed to record
> follower 25's position 0 for partition [click_engage,116] since the replica
> 25 is not recognized to be one of the assigned r
> eplicas  for partition [click_engage,116]
> at
> kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:231)
> at
> kafka.server.ReplicaManager.recordFollowerPosition(ReplicaManager.scala:432)
> at
> kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:460)
> at
> kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:458)
> at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:176)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
> at
> kafka.server.KafkaApis.maybeUpdatePartitionHw(KafkaApis.scala:458)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:424)
> at kafka.server.KafkaApis.handle(Kafk

leader election rate

2015-04-24 Thread Wesley Chow
Looking at the output from the jmx stats from our Kafka cluster, I see a
more or less constant leader election rate of around 2.5 from our
controller. Is this expected, or does this mean that leaders are shifting
around constantly?

If they are shifting, how should I go about debugging, and what triggers a
leader election?

Thanks,
Wes


Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-16 Thread Wesley Chow
A call with Amazon confirmed instability for d2 and c4 instances triggered
by lots of network activity. They fixed the problem and have since rolled
it out. We've been running Kafka with d2's for a little while now and so
far so good.

Wes


On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow  wrote:

>
> We have run d2 instances with Kafka. They're currently unstable -- Amazon
> confirmed a host issue with d2 instances that gets tickled by a Kafka
> workload yesterday. Otherwise, it seems the d2 instance type is ideal as it
> gets an enormous amount of disk throughput and you'll likely be network
> bottlenecked.
>
> Wes
>
>
>   Steven Wu 
>  June 2, 2015 at 1:07 PM
> EBS (network attached storage) has got a lot better over the last a few
> years. we don't quite trust it for kafka workload.
>
> At Netflix, we were going with the new d2 instance type (HDD). our
> perf/load testing shows it satisfy our workload. SSD is better in latency
> curve but pretty comparable in terms of throughput. we can use the extra
> space from HDD for longer retention period.
>
> On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai 
> 
>
>   Henry Cai 
>  June 2, 2015 at 12:37 PM
> We have been hosting kafka brokers in Amazon EC2 and we are using EBS
> disk. But periodically we were hit by long I/O wait time on EBS in some
> Availability Zones.
>
> We are thinking to change the instance types to a local HDD or local SSD.
> HDD is cheaper and bigger and seems quite fit for the Kafka use case which
> is mostly sequential read/write, but some early experiments show the HDD
> cannot catch up with the message producing speed since there are many
> topic/partitions on the broker which actually makes the disk I/O more
> randomly accessed.
>
> How are people's experience of choosing disk types on Amazon?
>
>


Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-16 Thread Wesley Chow
Should not matter. We're running 12.04.

Wes
 On Jun 16, 2015 12:18 PM, "Henry Cai"  wrote:

> Does it still matter whether we are using Ubuntu 14 or 12?
>
> On Tue, Jun 16, 2015 at 8:44 AM, Wesley Chow  wrote:
>
> >
> > A call with Amazon confirmed instability for d2 and c4 instances
> triggered
> > by lots of network activity. They fixed the problem and have since rolled
> > it out. We've been running Kafka with d2's for a little while now and so
> > far so good.
> >
> > Wes
> >
> >
> > On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow  wrote:
> >
> >>
> >> We have run d2 instances with Kafka. They're currently unstable --
> Amazon
> >> confirmed a host issue with d2 instances that gets tickled by a Kafka
> >> workload yesterday. Otherwise, it seems the d2 instance type is ideal
> as it
> >> gets an enormous amount of disk throughput and you'll likely be network
> >> bottlenecked.
> >>
> >> Wes
> >>
> >>
> >>   Steven Wu 
> >>  June 2, 2015 at 1:07 PM
> >> EBS (network attached storage) has got a lot better over the last a few
> >> years. we don't quite trust it for kafka workload.
> >>
> >> At Netflix, we were going with the new d2 instance type (HDD). our
> >> perf/load testing shows it satisfy our workload. SSD is better in
> latency
> >> curve but pretty comparable in terms of throughput. we can use the extra
> >> space from HDD for longer retention period.
> >>
> >> On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai 
> >> 
> >>
> >>   Henry Cai 
> >>  June 2, 2015 at 12:37 PM
> >> We have been hosting kafka brokers in Amazon EC2 and we are using EBS
> >> disk. But periodically we were hit by long I/O wait time on EBS in some
> >> Availability Zones.
> >>
> >> We are thinking to change the instance types to a local HDD or local
> SSD.
> >> HDD is cheaper and bigger and seems quite fit for the Kafka use case
> which
> >> is mostly sequential read/write, but some early experiments show the HDD
> >> cannot catch up with the message producing speed since there are many
> >> topic/partitions on the broker which actually makes the disk I/O more
> >> randomly accessed.
> >>
> >> How are people's experience of choosing disk types on Amazon?
> >>
> >>
> >
>


is a topic compressed?

2017-09-20 Thread Wesley Chow
I have a producer configured to snappy compress data sent to a cluster. Is
there some way to verify that the data indeed is being compressed? If I
peek at the .log files on the broker, I can read some plain text amongst
binary. Similarly, tcpdump shows plain text readable data. I do not know if
this is evidence that compression is not working, but is there a better way
to verify that compression is turned on?

Wes


Re: is a topic compressed?

2017-09-26 Thread Wesley Chow
Excellent, the DumpLogSegment tool did the trick!

Wes

On Thu, Sep 21, 2017 at 4:32 AM, Manikumar 
wrote:

> you can try DumpLogSegments tools to verify messages from log files.  This
> will give compression type for each message.
> https://cwiki.apache.org/confluence/display/KAFKA/
> System+Tools#SystemTools-
> DumpLogSegment
>
> On Thu, Sep 21, 2017 at 1:38 PM, Vincent Dautremont <
> vincent.dautrem...@olamobile.com.invalid> wrote:
>
> > Hi,
> > Snappy keeps a lot of parts in plain text :
> > look that example where only "pedia" is encoded/tokenized in the
> sentence.
> > https://en.wikipedia.org/wiki/Snappy_(compression)
> >
> > > Wikipedia is a free, web-based, collaborative, multilingual
> encyclopedia
> > > project.
> >
> >
> > your data is then probably compressed with snappy.
> >
> > Another try would be to change compression to other values (or remove
> > compression) and compare the tcp dump with the one you already have.
> >
> >
> > Vincent.
> >
> > On Wed, Sep 20, 2017 at 11:58 PM, Wesley Chow  wrote:
> >
> > > I have a producer configured to snappy compress data sent to a cluster.
> > Is
> > > there some way to verify that the data indeed is being compressed? If I
> > > peek at the .log files on the broker, I can read some plain text
> amongst
> > > binary. Similarly, tcpdump shows plain text readable data. I do not
> know
> > if
> > > this is evidence that compression is not working, but is there a better
> > way
> > > to verify that compression is turned on?
> > >
> > > Wes
> > >
> >
> > --
> > The information transmitted is intended only for the person or entity to
> > which it is addressed and may contain confidential and/or privileged
> > material. Any review, retransmission, dissemination or other use of, or
> > taking of any action in reliance upon, this information by persons or
> > entities other than the intended recipient is prohibited. If you received
> > this in error, please contact the sender and delete the material from any
> > computer.
> >
>


compacted log key limits

2016-09-15 Thread Wesley Chow
Is there any guidance on a maximum number different keys in a compacted
log? Such total numbers, or "keys need to fit in memory, message data does
not", etc. Is it unreasonable to expect tens or hundreds of millions of
keys in a single topic to be handled gracefully?

Thanks,
Wes


drop in fetch-follower requests

2015-10-23 Thread Wesley Chow
We deployed a Samza process that effectively doubled our fetch rates in our
cluster and noticed a significant drop fetch-follower requests. What is the
difference between fetch-consumer and fetch-follower?

[image: Inline image 1]

We noticed that some topics moved along undisturbed, sending in the same
rate of data they had been before. Other topics saw a drop in traffic. It
seems like the pattern is that the topics with reduced message rate were
derived from our source topics, ie consumers of a source topic that
published to another topic.

It doesn't seem like we hit any sort of machine resource limits. We're well
under network, CPU, and disk IO utilization. This is a 6 node cluster with
all topics replicated 3 times (ie, one leader, two followers).

Any thoughts on what might have happened?

Thanks,
Wes


Re: Hash partition of key with skew

2016-05-03 Thread Wesley Chow
I’ve come up with a couple solutions since we too have a power law 
distribution. However, we have not put anything into practice.

Fixed Slicing

One simple thing to do is to take each key and slice it into some fixed number 
of partitions. So your function might be:

(hash(key) % num) + (hash(key) % 10)

In order to distribute it across 10 partitions. Or:

hash(key + ‘0’) % num
hash(key + ‘1’) % num
…
hash(key + ‘9’) % num


Hyperspace Hashing

If your data is multi-dimensional, then you might find hyperspace hashing 
useful. I’ll give a simple example, but it’s easy to generalize. Suppose that 
you have two dimensions you’d like to partition on: customer id (C) and city 
location (L). You’d like to be able to subscribe to all data for some subset of 
customers, and you’d also like to be able to subscribe to all data for some 
subset of locations. Suppose that this data goes into a topic with 256 
partitions.

For any piece of data, you’d construct the partition it goes to like so:

((hash(C) % 16) << 4) + ((hash(L) % 16)

What that is basically saying is take C and map them to 16 different spots, and 
set it as the high 4 bits of an 8 bit int. Then take the location, map it to 16 
different spots, and set it as the lower 4 bits of the int. The resulting 
number is the partition that piece of data goes to.

Now if you want one particular C, you subscribe to the 16 partitions that 
contain that C. If you want some particular L, you subscribe to the 16 
partitions that contain that L.

You can extend this scheme to an arbitrary number of dimensions subject to the 
number of partitions in the topic, and you can vary the number of bits that any 
particular dimension takes. This scheme suffers from a combinatorial explosion 
of partitions if you really want to query on lots of different dimensions, but 
you can see the Hyperdex paper for clues on how to deal with this.


Unbalanced Hashing

It’s easy to generate ok but not great hash functions. One is DJB hash, which 
relies on two empirically determined constants:

http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
 


(5381 and 33 in the above example)

If you can do offline analysis, and your distribution doesn’t change over time, 
then you can basically exhaustively search for two values that produce a hash 
function that better distributes the load.


Greedy Knapsack

But if you’re ok doing offline analysis and generating your own hash function, 
then you can create one that’s simply a hard coded list of mappings for the 
heaviest keys, and then defaults to a regular hash for the rest. The easiest 
way to programmatically do this is to use a greedy algorithm:

  for each heavy key, k:
assign k to the partition with the least assigned weight


The advantage to fixed slicing and hyperspace hashing is that you don’t have to 
know your distribution a priori, and it generally scales well as you increase 
the number of keys. The disadvantage is that one key’s data is split across 
multiple partitions.

The advantage to unbalanced hashing and greedy knapsack is that you can get 
close to an optimal partitioning scheme and all of one key resides in one 
partition. The downside is that you need to do partition mapping management as 
your distribution changes over time.

Hopefully that gives you some ideas!

Wes



> On May 3, 2016, at 9:09 AM, Jens Rantil  wrote:
> 
> Hi,
> 
> Not sure if this helps, but the way Loggly seem to do it is to have a
> separate topic for "noisy neighbors". See [1].
> 
> [1]
> https://www.loggly.com/blog/loggly-loves-apache-kafka-use-unbreakable-messaging-better-log-management/
> 
> Cheers,
> Jens
> 
> On Wed, Apr 27, 2016 at 9:11 PM Srikanth  wrote:
> 
>> Hello,
>> 
>> Is there a recommendation for handling producer side partitioning based on
>> a key with skew?
>> We want to partition on something like clientId. Problem is, this key has
>> an uniform distribution.
>> Its equally likely to see a key with 3k occurrence/day vs 100k/day vs
>> 65million/day.
>> Cardinality of key is around 1500 and there are approx 1 billion records
>> per day.
>> Partitioning by hashcode(key)%numOfPartition will create a few "hot
>> partitions" and cause a few brokers(and consumer threads) to be overloaded.
>> May be these partitions with heavy load are evenly distributed among
>> brokers, may be they are not.
>> 
>> I read KIP-22
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-22+-+Expose+a+Partitioner+interface+in+the+new+producer
>>> 
>> that
>> explains how one could write a custom partitioner.
>> I'd like to know how it was used to solve such data skew.
>> We can compute some statistics on key distribution offline and use it in
>> the partitioner.
>> Is that a good idea? Or is it way too much logic for a partitioner?
>> Anything else to consider?
>> Any thoughts or reference will be helpful.
>> 
>> Thanks,
>

Re: Hash partition of key with skew

2016-05-03 Thread Wesley Chow
I’m not the OP, but in our case, we sometimes want data locality. For example, 
suppose that we have 100 consumers that are building up a cache of customer -> 
data mapping. If customer data is spread randomly across all partitions then a 
query for that customer’s data would have to hit all 100 consumers. If customer 
data exhibits some locality, then queries for that data only hit a subset of 
consumers.

Wes


> On May 3, 2016, at 11:18 AM, Tauzell, Dave  
> wrote:
> 
> Do you need the messages to be ordered in some way?   Why pass a key if you 
> don't want all the messages to go to one partition?
> 
> -Dave
> 
> Dave Tauzell | Senior Software Engineer | Surescripts
> O: 651.855.3042 | www.surescripts.com <http://www.surescripts.com/> |   
> dave.tauz...@surescripts.com <mailto:dave.tauz...@surescripts.com>
> Connect with us: Twitter I LinkedIn I Facebook I YouTube
> 
> 
> -Original Message-
> From: Wesley Chow [mailto:w...@chartbeat.com <mailto:w...@chartbeat.com>]
> Sent: Tuesday, May 03, 2016 9:51 AM
> To: users@kafka.apache.org <mailto:users@kafka.apache.org>
> Subject: Re: Hash partition of key with skew
> 
> I’ve come up with a couple solutions since we too have a power law 
> distribution. However, we have not put anything into practice.
> 
> Fixed Slicing
> 
> One simple thing to do is to take each key and slice it into some fixed 
> number of partitions. So your function might be:
> 
> (hash(key) % num) + (hash(key) % 10)
> 
> In order to distribute it across 10 partitions. Or:
> 
> hash(key + ‘0’) % num
> hash(key + ‘1’) % num
> …
> hash(key + ‘9’) % num
> 
> 
> Hyperspace Hashing
> 
> If your data is multi-dimensional, then you might find hyperspace hashing 
> useful. I’ll give a simple example, but it’s easy to generalize. Suppose that 
> you have two dimensions you’d like to partition on: customer id (C) and city 
> location (L). You’d like to be able to subscribe to all data for some subset 
> of customers, and you’d also like to be able to subscribe to all data for 
> some subset of locations. Suppose that this data goes into a topic with 256 
> partitions.
> 
> For any piece of data, you’d construct the partition it goes to like so:
> 
> ((hash(C) % 16) << 4) + ((hash(L) % 16)
> 
> What that is basically saying is take C and map them to 16 different spots, 
> and set it as the high 4 bits of an 8 bit int. Then take the location, map it 
> to 16 different spots, and set it as the lower 4 bits of the int. The 
> resulting number is the partition that piece of data goes to.
> 
> Now if you want one particular C, you subscribe to the 16 partitions that 
> contain that C. If you want some particular L, you subscribe to the 16 
> partitions that contain that L.
> 
> You can extend this scheme to an arbitrary number of dimensions subject to 
> the number of partitions in the topic, and you can vary the number of bits 
> that any particular dimension takes. This scheme suffers from a combinatorial 
> explosion of partitions if you really want to query on lots of different 
> dimensions, but you can see the Hyperdex paper for clues on how to deal with 
> this.
> 
> 
> Unbalanced Hashing
> 
> It’s easy to generate ok but not great hash functions. One is DJB hash, which 
> relies on two empirically determined constants:
> 
> http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
>  
> <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function><http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function
>  
> <http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function>>
> 
> (5381 and 33 in the above example)
> 
> If you can do offline analysis, and your distribution doesn’t change over 
> time, then you can basically exhaustively search for two values that produce 
> a hash function that better distributes the load.
> 
> 
> Greedy Knapsack
> 
> But if you’re ok doing offline analysis and generating your own hash 
> function, then you can create one that’s simply a hard coded list of mappings 
> for the heaviest keys, and then defaults to a regular hash for the rest. The 
> easiest way to programmatically do this is to use a greedy algorithm:
> 
>  for each heavy key, k:
>assign k to the partition with the least assigned weight
> 
> 
> The advantage to fixed slicing and hyperspace hashing is that you don’t have 
> to know your distribution a priori, and it generally scales well as you 
> increase the number of keys. The disadvantage is that one key’s data is split 
> across multiple partitions.
> 
> Th

Re: Hash partition of key with skew

2016-05-03 Thread Wesley Chow
> 
> Upload to S3 is partitioned by the "key" field. I.e, one folder per key. It
> does offset management to make sure offset commit is in sync with S3 upload.

We do this in several spots and I wish we had built our system in such a way 
that we could just open source it. I’m sure many people have solved this 
repeatedly. We’ve had significant disk performance issues when the number of 
keys is large (40,000-ish in our case) — you can’t be expected to open a file 
per key. That’s why something like the fixed slicing strategy I described can 
make a big difference.

Wes



Re: Hash partition of key with skew

2016-05-04 Thread Wesley Chow

We don’t do this on the Kafka side, but for a different system that has similar 
distribution problems we manually maintain a map of “hot” keys. On the Kafka 
side, we distribute keys with an even distribution in our largest volume topic, 
and then squash the data and repartition based on a skewed key. The resulting 
skew is somewhat insignificant compared to our largest volume topic that we 
tend to not care.

Wes


> On May 4, 2016, at 2:57 PM, Srikanth  wrote:
> 
> Yeah, fixed slicing may help. I'll put more thought into this.
> You had mentioned that you didn't put custom partitioner into production.
> Would you mind sharing how you worked around this currently?
> 
> Srikanth
> 
> On Tue, May 3, 2016 at 5:43 PM, Wesley Chow  wrote:
> 
>>> 
>>> Upload to S3 is partitioned by the "key" field. I.e, one folder per key.
>> It
>>> does offset management to make sure offset commit is in sync with S3
>> upload.
>> 
>> We do this in several spots and I wish we had built our system in such a
>> way that we could just open source it. I’m sure many people have solved
>> this repeatedly. We’ve had significant disk performance issues when the
>> number of keys is large (40,000-ish in our case) — you can’t be expected to
>> open a file per key. That’s why something like the fixed slicing strategy I
>> described can make a big difference.
>> 
>> Wes
>> 
>> 



compacted log TTLs

2016-05-12 Thread Wesley Chow
Are there any thoughts on supporting TTLs on keys in compacted logs? In
other words, some way to set on a per-key basis a time to auto-delete.

Wes


Re: compacted log TTLs

2016-05-12 Thread Wesley Chow
Right, I’m trying to avoid explicitly managing TTLs. It’s nice being able to 
just produce keys into Kafka without having an accompanying vacuum consumer.

Wes


> On May 12, 2016, at 5:15 PM, Benjamin Manns  wrote:
> 
> If you send a NULL value to a compacted log, after the retention period it
> will be removed. You could run a process that reprocesses the log and sends
> a NULL to keys you want to purge based on some custom logic.
> 
> On Thu, May 12, 2016 at 2:01 PM, Wesley Chow  wrote:
> 
>> Are there any thoughts on supporting TTLs on keys in compacted logs? In
>> other words, some way to set on a per-key basis a time to auto-delete.
>> 
>> Wes
>> 
> 
> 
> 
> -- 
> Benjamin Manns
> benma...@gmail.com
> (434) 321-8324



Re: compacted log TTLs

2016-05-13 Thread Wesley Chow
Yes, also classic caching, where you might use memcache with TTLs.

But a different use case for us is sessionizing. We push a high rate of updates 
coming from a browser session to our Kafka cluster. If we don’t see an update 
for a particular session after some period of time, we say that session has 
expired and want to delete it. Compacted logs seem great for this, however 
without TTLs we’d have to consume these updates to figure out when to expire 
the session. I can go into more detail if that’s not clear.

The general case here is that sometimes you want a kv store that doesn’t exceed 
some resource bound. In the case of caching, you may not want to exceed some 
time bound, but you may also not want to exceed some space bound. You can 
totally deal with these bounds with a consumer, but if the rate of updates to 
the keys is high then this could be an expensive proposition. In the case of my 
sessionizing problem, consuming that data to deal with expirations can easily 
add tens of thousands of dollars in inter-AZ costs per year (not to mention the 
servers to run the extra consumers), so having it taken care of in the brokers 
is actually very useful.

Wes


> On May 12, 2016, at 8:25 PM, Peter Davis  wrote:
> 
> One use case is implementing a data retention policy.
> 
> -Peter
> 
> 
>> On May 12, 2016, at 17:11, Guozhang Wang  wrote:
>> 
>> Wesley,
>> 
>> Could describe your use case a bit more for motivating this? Is your data
>> source expiring records and hence you want to auto "delete" the
>> corresponding Kafka records as well?
>> 
>> Guozhang
>> 
>>> On Thu, May 12, 2016 at 2:35 PM, Wesley Chow  wrote:
>>> 
>>> Right, I’m trying to avoid explicitly managing TTLs. It’s nice being able
>>> to just produce keys into Kafka without having an accompanying vacuum
>>> consumer.
>>> 
>>> Wes
>>> 
>>> 
>>>> On May 12, 2016, at 5:15 PM, Benjamin Manns  wrote:
>>>> 
>>>> If you send a NULL value to a compacted log, after the retention period
>>> it
>>>> will be removed. You could run a process that reprocesses the log and
>>> sends
>>>> a NULL to keys you want to purge based on some custom logic.
>>>> 
>>>> On Thu, May 12, 2016 at 2:01 PM, Wesley Chow  wrote:
>>>> 
>>>>> Are there any thoughts on supporting TTLs on keys in compacted logs? In
>>>>> other words, some way to set on a per-key basis a time to auto-delete.
>>>>> 
>>>>> Wes
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Benjamin Manns
>>>> benma...@gmail.com
>>>> (434) 321-8324
>> 
>> 
>> -- 
>> -- Guozhang



high rate of small packets between brokers

2016-07-05 Thread Wesley Chow
I’ve been investigating some possible network performance issues we’re having 
with our Kafka brokers, and noticed that traffic sent between brokers tends to 
show frequent bursts of very small packets:

16:09:52.299863 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 127908:127925, ack 4143, win 32488, length 17
16:09:52.299870 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 127925:127943, ack 4143, win 32488, length 18
16:09:52.299876 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 127943:127967, ack 4143, win 32488, length 24
16:09:52.299889 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 127967:127985, ack 4143, win 32488, length 18
16:09:52.299892 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 127985:127999, ack 4143, win 32488, length 14
16:09:52.299895 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 127999:128017, ack 4143, win 32488, length 18
16:09:52.299897 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 128017:128031, ack 4143, win 32488, length 14
16:09:52.299900 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39399: 
Flags [P.], seq 128031:128049, ack 4143, win 32488, length 18
16:09:52.300612 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39400: 
Flags [P.], seq 279162:279178, ack 6700, win 32488, length 16
16:09:52.300645 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39400: 
Flags [P.], seq 279178:279189, ack 6700, win 32488, length 11
16:09:52.300655 IP stream02.chartbeat.net.9092 > stream03.chartbeat.net.39400: 
Flags [P.], seq 279189:279207, ack 6700, win 32488, length 18

I don’t know if this in itself is really an issue, but I thought I’d check with 
the group to see. The MTU on the interfaces is set to 9001, and regular 
consumers don’t get the same bursts of small push packets. Our replica config 
is:

replica.lag.time.max.ms=1
replica.lag.max.messages=4000
replica.socket.timeout.ms=301000
replica.socket.receive.buffer.bytes=641024
replica.fetch.max.bytes=10241024
replica.fetch.wait.max.ms=500
replica.fetch.min.bytes=1
num.replica.fetchers=16

Any thoughts on whether or not this is an issue, and if so how we should 
correct it? I’m wondering about the replica.fetch.*.bytes settings — it’s 
unclear to me from the docs what those do exactly.

Thanks,
Wes