[jira] [Created] (KAFKA-5973) ShutdownableThread catching errors can lead to partial hard to diagnose broker failure

2017-09-25 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-5973:
---

 Summary: ShutdownableThread catching errors can lead to partial 
hard to diagnose broker failure
 Key: KAFKA-5973
 URL: https://issues.apache.org/jira/browse/KAFKA-5973
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.0, 0.11.0.1
Reporter: Tom Crayford
Priority: Minor
 Fix For: 1.0.0, 0.11.0.2


When any kafka broker {{ShutdownableThread}} subclasses crashes due to an
uncaught exception, the broker is left running in a very weird/bad state with 
some
threads not running, but potentially the broker can still be serving traffic to
users but not performing its usual operations.

This is problematic, because monitoring may say that "the broker is up and 
fine", but in fact it is not healthy.

At Heroku we've been mitigating this by monitoring all threads that "should" be
running on a broker and alerting when a given thread isn't running for some
reason.

Things that use {{ShutdownableThread}} that can crash and leave a broker/the 
controller in a bad state:
- log cleaner
- replica fetcher threads
- controller to broker send threads
- controller topic deletion threads
- quota throttling reapers
- io threads
- network threads
- group metadata management threads

Some of these can have disasterous consequences, and nearly all of them 
crashing for any reason is a cause for alert.
But, users probably shouldn't have to know about all the internals of Kafka and 
run thread dumps periodically as part of normal operations.

There are a few potential options here:

1. On the crash of any {{ShutdownableThread}}, shutdown the whole broker process

We could crash the whole broker when an individual thread dies. I think this is 
pretty reasonable, it's better to have a very visible breakage than a very hard 
to detect one.

2. Add some healthcheck JMX bean to detect these thread crashes

Users having to audit all of Kafka's source code on each new release and track 
a list of "threads that should be running" is... pretty silly. We could instead 
expose a JMX bean of some kind indicating threads that died due to uncaught 
exceptions

3. Do nothing, but add documentation around monitoring/logging that exposes 
this error

These thread deaths *do* emit log lines, but it's not that clear or obvious to 
users they need to monitor and alert on them. The project could add 
documentation




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-4596) KIP-73 rebalance throttling breaks on plans for specific partitions

2017-01-05 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-4596:
---

 Summary: KIP-73 rebalance throttling breaks on plans for specific 
partitions
 Key: KAFKA-4596
 URL: https://issues.apache.org/jira/browse/KAFKA-4596
 Project: Kafka
  Issue Type: Bug
 Environment: Kafka 0.10.1.1
Reporter: Tom Crayford


The reassign-partitions.sh command fails if you both *throttle* and give it a 
specific partition reassignment. For example, upon reassigning 
{code}__consumer_offsets{code} partition 19, you get the following error:

{code}
Save this to use as the --reassignment-json-file option during rollback
Warning: You must run Verify periodically, until the reassignment completes, to 
ensure the throttle is removed. You can also alter the throttle by rerunning 
the Execute command passing a new value.
The throttle limit was set to 1048576 B/s
Partitions reassignment failed due to key not found: [__consumer_offsets,30]
java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
2)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
1)
at 
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at 
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at 
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at 
kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
at 
kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
at 
kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
at 
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
at 
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
at 
kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
at 
kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
{code}

This effectively breaks the throttling feature unless you want to rebalance 
many many partitions at once.

For reference the command that was run is:

{code}
kafka-reassign-partitions.sh --reassignment-json-file 
9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
-4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
{code}

and the contents of the plan file is:

{code}
{"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
{code}

This seems like a simple logic error to me, where we're trying to look up a 
partition that's not been proposed, when we should not be. It looks like the 
logic assumes that {code}Map.apply{code} doesn't error if the lookup value 
isn't found, when in fact it does.

I checked that this cluster does indeed have the __consumer_offsets topic 
populated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4441) Kafka Monitoring is incorrect during rapid topic creation and deletion

2016-11-24 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-4441:
---

 Summary: Kafka Monitoring is incorrect during rapid topic creation 
and deletion
 Key: KAFKA-4441
 URL: https://issues.apache.org/jira/browse/KAFKA-4441
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.1, 0.10.0.0
Reporter: Tom Crayford
Priority: Minor


Kafka reports several metrics off the state of partitions:
UnderReplicatedPartitions
PreferredReplicaImbalanceCount
OfflinePartitionsCount

All of these metrics trigger when rapidly creating and deleting topics in a 
tight loop, although the actual causes of the metrics firing are from topics 
that are undergoing creation/deletion, and the cluster is otherwise stable.

Looking through the source code, topic deletion goes through an asynchronous 
state machine: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L35.


However, the metrics do not know about the progress of this state machine: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L185
 


I believe the fix to this is relatively simple - we need to make the metrics 
know that a topic is currently undergoing deletion or creation, and only 
include topics that are "stable"



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2016-08-24 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-4084:
---

 Summary: automated leader rebalance causes replication downtime 
for clusters with too many partitions
 Key: KAFKA-4084
 URL: https://issues.apache.org/jira/browse/KAFKA-4084
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.8.2.2, 0.9.0.0
Reporter: Tom Crayford
 Fix For: 0.10.1.0


If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
you have a cluster with many partitions, there is a severe amount of 
replication downtime following a restart. This causes 
`UnderReplicatedPartitions` to fire, and replication is paused.

This is because the current automated leader rebalance mechanism changes 
leaders for *all* imbalanced partitions at once, instead of doing it gradually. 
This effectively stops all replica fetchers in the cluster (assuming there are 
enough imbalanced partitions), and restarts them. This can take minutes on busy 
clusters, during which no replication is happening and user data is at risk. 
Clients with {{acks=-1}} also see issues at this time, because replication is 
effectively stalled.

To quote Todd Palino from the mailing list:


bq. There is an admin CLI command to trigger the preferred replica election 
manually. There is also a broker configuration “auto.leader.rebalance.enable” 
which you can set to have the broker automatically perform the PLE when needed. 
DO NOT USE THIS OPTION. There are serious performance issues when doing so, 
especially on larger clusters. It needs some development work that has not been 
fully identified yet.

This setting is extremely useful for smaller clusters, but with high partition 
counts causes the huge issues stated above.

One potential fix could be adding a new configuration for the number of 
partitions to do automated leader rebalancing for at once, and *stop* once that 
number of leader rebalances are in flight, until they're done. There may be 
better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3894) LogCleaner thread crashes if not even one segment can fit in the offset map

2016-08-11 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417675#comment-15417675
 ] 

Tom Crayford commented on KAFKA-3894:
-

Hi Jun,

We're probably going to start on b. for now. I think a. is incredibly valuable, 
but it doesn't impact this manner of the log cleaner crashing. I think there 
are some cases where we will fail to clean up data, but having those exist 
seems far more preferable than crashing the thread entirely.

We'll get started with b., hopefully will have a patch up within a few business 
days.

> LogCleaner thread crashes if not even one segment can fit in the offset map
> ---
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>  Labels: compaction
> Fix For: 0.10.1.0
>
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3976) Kafka Controller gets stuck, potentially due to zookeeper session id reuse

2016-07-19 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-3976:
---

 Summary: Kafka Controller gets stuck, potentially due to zookeeper 
session id reuse
 Key: KAFKA-3976
 URL: https://issues.apache.org/jira/browse/KAFKA-3976
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0, 0.9.0.1
Reporter: Tom Crayford


We've been seeing an issue with a kafka controller getting "stuck" under 
0.9.0.1. We carefully monitor {{ActiveControllerCount}} on all brokers, and 
upon finding it to be 0 for long enough, page an operator. In this particular 
case (and we've seen similar a few times before), it appears as though there 
were some weird conditions leading to the cluster not re-electing a controller.

This could also potentially be a zookeeper client bug. We've spent a while 
looking through the code, and it seems like a session gets re-established, but 
the zookeeper session state callbacks don't seem to get fired on all the 
listeners properly, which leads to the controller being stuck.

Note that with the cluster in this bad state, we've got a znode at /controller 
and /brokers/ids/0 on this bad node:

{code}
get /controller
{"version":1,"brokerid":0,"timestamp":"1468870986582"}
cZxid = 0x2a646
ctime = Mon Jul 18 19:43:06 UTC 2016
mZxid = 0x2a646
mtime = Mon Jul 18 19:43:06 UTC 2016
pZxid = 0x2a646
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x155e49f2b230004
dataLength = 54
numChildren = 0
get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1468870990738","endpoints":["SSL://REDACTED"],"host":null,"version":2,"port":-1}
cZxid = 0x2a64e
ctime = Mon Jul 18 19:43:10 UTC 2016
mZxid = 0x2a64e
mtime = Mon Jul 18 19:43:10 UTC 2016
pZxid = 0x2a64e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x155e49f2b230004
dataLength = 144
numChildren = 0
{code}

However, that broker definitely doesn't have any {{ActiveControllerCount}} 
(this is a response out of jolokia):

{code}
{"request":{"mbean":"kafka.controller:name=ActiveControllerCount,type=KafkaController","type":"read"},"value":{"Value":0},"timestamp":1468936463,"status":200}
{code}

If you look at the attached log files, you can see that *both* brokers went 
through a brief period where they thought they were the controller, then both 
resigned and the callbacks meant to fire upon establishment of new sessions did 
not fire properly, which leads to a "stuck" controller.

It's perhaps worth noting that these brokers have *no* more log output at all 
since this point. They are relatively unused though, so that is not that 
surprising.

Logs are here (this is a two broker cluster): 
https://gist.github.com/tcrayford/cbf3d5aa1c46194eb7a98786d83b0eab
Sensitive data have been redacted, replaced with REDACTED_$TYPE
The most interesting stuff happens right at the bottom of each log file, where 
it appears a zookeeper session is timed out, then re-established. From my 
understanding of the ZAB protocol, Kafka shouldn't reuse session ids here, but 
it seems to be. That's potentially the issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts

2016-07-14 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376858#comment-15376858
 ] 

Tom Crayford commented on KAFKA-3894:
-

Jun:

#4 seems potentially very complex to me. It also doesn't work in the case that 
the broker is shut down and the dedupe buffer size adjusted. I much prefer #3 - 
it maps fine into the existing model as far as I can tell - we'd "just" split 
the log file we're cleaning once the offsetmap is full. That of course requires 
a little more IO, but it doesn't involve implementing (or using a library for) 
sketches that could potentially be incorrect. It also seems like the right long 
term solution, and more robust than automatically rolling log files some of the 
time. Am I missing something here? 

Upsides of #3 vs #4:
We can now clean the largest log segment, no matter the buffer size.
We don't increase complexity of the produce path, or change memory usage.
We don't have to implement or reuse a library for estimating unique keys
We don't have to figure out storing the key estimate (e.g. in the index or in a 
new file alongside each segment).

Downsides:
It would increase the complexity of the cleaner.
The code that swaps in and out segments will also get more complex, and the 
crash-safety of that code is already tricky.

Exists in both:
Larger log segments could potentially be split a lot, and not always 
deduplicated that well together. For example, if I write the max number of 
unique keys for the offset map into a topic, then the segment rolls, then I 
write a tombstone for every message in the previously sent messages, then 
neither #3 nor #4 would ever clear up any data. This is no worse than today 
though.

Cassandra and other LSM based systems that do log structured storage and 
over-time compaction use similar "splitting and combining" mechanisms to ensure 
everything gets cleared up over time without using too much memory. They have a 
very different storage architecture and goals to Kafka's compaction, for sure, 
but it's interesting to note that they care about similar things.

> Log Cleaner thread crashes and never restarts
> -
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>  Labels: compaction
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2016-07-12 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-3955:
---

 Summary: Kafka log recovery doesn't truncate logs on non-monotonic 
offsets, leading to failed broker boot
 Key: KAFKA-3955
 URL: https://issues.apache.org/jira/browse/KAFKA-3955
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2, 0.9.0.0, 0.8.2.1, 0.8.2.0, 
0.8.1.1, 0.8.1, 0.8.0
Reporter: Tom Crayford


Hi,

I've found a bug impacting kafka brokers on startup after an unclean shutdown. 
If a log segment is corrupt and has non-monotonic offsets (see the appendix of 
this bug for a sample output from {{DumpLogSegments}}), then 
{{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218

That code is called by {{LogSegment.recover}}: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191

Which is called in several places in {{Log.scala}}. Notably it's called four 
times during recovery:

Thrice in Log.loadSegments
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226

and once in Log.recoverLog

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268

Of these, only the very last one has a {{catch}} for 
{{InvalidOffsetException}}. When that catches the issue, it truncates the whole 
log (not just this segment): 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
 to the start segment of the bad log segment.

However, this code can't be hit on recovery, because of the code paths in 
{{loadSegments}} - they mean we'll never hit truncation here, as we always 
throw this exception and that goes all the way to the toplevel exception 
handler and crashes the JVM.

As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
this is to move this crash recovery/truncate code inside a new method in 
{{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
That code should return the number of {{truncatedBytes}} like we do in 
{{Log.recoverLog}} and then truncate the log. The callers will have to be 
notified "stop iterating over files in the directory", likely via a return 
value of {{truncatedBytes}} like {{Log.recoverLog` does right now.

I'm happy working on a patch for this. I'm aware this recovery code is tricky 
and important to get right.

I'm also curious (and currently don't have good theories as of yet) as to how 
this log segment got into this state with non-monotonic offsets. This segment 
is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
recovery exists in trunk, but I'm unsure if the new handling around compressed 
messages (KIP-31) means the bug where non-monotonic offsets get appended is 
still present in trunk.

As a production workaround, one can manually truncate that log folder yourself 
(delete all .index/.log files including and after the one with the bad offset). 
However, kafka should (and can) handle this case well - with replication we can 
truncate in broker startup.

stacktrace and error message:

{code}
pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
/$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
index...
pri=ERROR t=main at=LogManager There was an error in one of the threads during 
logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset 
(15000337) to position 111719 no larger than the last offset appended 
(15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. Prepare 
to shutdown kafka.common.InvalidOffsetException: Attempt to append an offset 
(15000337) to position 111719 no larger than the last offset appended 
(15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at scala.collection.TraversableLike$With...
...Filter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 

[jira] [Created] (KAFKA-3937) Kafka Clients Leak Native Memory For Longer Than Needed With Compressed Messages

2016-07-08 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-3937:
---

 Summary: Kafka Clients Leak Native Memory For Longer Than Needed 
With Compressed Messages
 Key: KAFKA-3937
 URL: https://issues.apache.org/jira/browse/KAFKA-3937
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2
 Environment: Linux, latest oracle java-8
Reporter: Tom Crayford
Priority: Minor


In https://issues.apache.org/jira/browse/KAFKA-3933, we discovered that brokers 
can crash when performing log recovery, as they leak native memory whilst 
decompressing compressed segments, and that native memory isn't cleaned up 
rapidly enough by garbage collection and finalizers. The work to fix that in 
the brokers is taking part in https://github.com/apache/kafka/pull/1598. As 
part of that PR, Ismael Juma asked me to fix similar issues in the client. 
Rather than have one large PR that fixes everything, I'd rather break this work 
up into seperate things, so I'm filing this JIRA to track the followup work. I 
should get to a PR on this at some point relatively soon, once the other PR has 
landed.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-08 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367625#comment-15367625
 ] 

Tom Crayford commented on KAFKA-3933:
-

Ismael: I've pushed the PR here: https://github.com/apache/kafka/pull/1598. For 
now, I've only fixed the precise memory issue that has been causing us notable 
production issues. I'm happy picking up other parts of the codebase where this 
can happen, but would rather get feedback on the first part of the approach for 
now (this is my first time contributing code to Kafka). I couldn't see a good 
or easy way to write any unit tests for this code right now.

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Assignee: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd 

[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366721#comment-15366721
 ] 

Tom Crayford commented on KAFKA-3933:
-

Done. I hope to work on a fix tomorrow.

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Assignee: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Crayford reassigned KAFKA-3933:
---

Assignee: Tom Crayford

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Assignee: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366329#comment-15366329
 ] 

Tom Crayford commented on KAFKA-3933:
-

That makes sense to me. Thanks for the pointer.

> Kafka OOM During Log Recovery Due to Leaked Native Memory
> -
>
> Key: KAFKA-3933
> URL: https://issues.apache.org/jira/browse/KAFKA-3933
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Linux, latest oracle java-8
>Reporter: Tom Crayford
>Priority: Critical
> Fix For: 0.10.0.1
>
>
> Hi there. We've been tracking an issue where Kafka hits an 
> java.lang.OutOfMemoryError during log recovery.
> After a bunch of tracking work, we've realized we've hit an instance of a 
> long known issue: http://www.evanjones.ca/java-native-leak-bug.html
> TLDR: Kafka breaks the rule "Always close GZIPInputStream and 
> GZIPOutputStream since they use native memory via zlib" from that article.
> As such, during broker startup, when you're recovering log segments that have 
> been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
> Our crashes during startup have this profile - the JVM heap is empty (a few 
> hundred MB), but the offheap memory is full of allocations caused by 
> `Java_java_util_zip_Deflater_init` and `deflatInit2`.
> This leads to broker crashes during startup. The only real mitigation is 
> having *far* more memory than you need to boot (which I'd guess is why folk 
> haven't noticed this in production that much yet).
> To dig into the code more (this is based on trunk). Log recovery on unflushed 
> segments eventually calls `LogSegment.recover`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172
> On compressed segments, that leads to a call to `deepIterator`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
> That leads to a call to `CompressionFactory`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
>  which creates a `GZIPInputStream`: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46
> That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
> means that the finalizer on `GZIPInputStream` that deallocates the native 
> buffers is never called, because GC is never triggered. Instead, we just 
> exhaust the offheap memory and then Kafka dies from an OutOfMemory error.
> Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
> reading the whole input stream (see 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
>  When it's performing log recovery, in 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
>  it doesn't read to the end of the stream, but instead reads the first offset 
> and leaves things alone.
> This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
> same way. I think (but haven't 100% verified) that it impacts all versions of 
> Kafka that are supported (0.8 -> 0.10).
> Fixing this seems relatively annoying, but only because of some "small 
> matters of coding", nothing hugely problematic.
> The main issue is that `deepIterator` only returns an `Iterator`, which 
> doesn't have a `close()` method of any kind. We could create a new 
> `ClosableIterator` trait and have it extend Java's `AutoCloseable` 
> (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), 
> then explicitly call `close()` everywhere we use a `deepIterator()` and don't 
> always read to the end. Scala unfortunately doesn't seem to have a built in 
> version of Java's `try-with-resources` statement, but we can explicitly call 
> close everywhere perfectly happily.
> Another (but much more hacky) solution would be to always read to the end of 
> the iterator in `LogSegment.recover`, but that seems pretty bad, using far 
> more resources than is needed during recovery.
> I can't think of any other reasonable solutions for now, but would love to 
> hear input from the community.
> We're happy doing the work of developing a patch, but thought we'd report the 
> issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory

2016-07-07 Thread Tom Crayford (JIRA)
Tom Crayford created KAFKA-3933:
---

 Summary: Kafka OOM During Log Recovery Due to Leaked Native Memory
 Key: KAFKA-3933
 URL: https://issues.apache.org/jira/browse/KAFKA-3933
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2
 Environment: Linux, latest oracle java-8
Reporter: Tom Crayford


Hi there. We've been tracking an issue where Kafka hits an 
java.lang.OutOfMemoryError during log recovery.
After a bunch of tracking work, we've realized we've hit an instance of a long 
known issue: http://www.evanjones.ca/java-native-leak-bug.html

TLDR: Kafka breaks the rule "Always close GZIPInputStream and GZIPOutputStream 
since they use native memory via zlib" from that article.

As such, during broker startup, when you're recovering log segments that have 
been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place.
Our crashes during startup have this profile - the JVM heap is empty (a few 
hundred MB), but the offheap memory is full of allocations caused by 
`Java_java_util_zip_Deflater_init` and `deflatInit2`.
This leads to broker crashes during startup. The only real mitigation is having 
*far* more memory than you need to boot (which I'd guess is why folk haven't 
noticed this in production that much yet).

To dig into the code more (this is based on trunk). Log recovery on unflushed 
segments eventually calls `LogSegment.recover`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172

On compressed segments, that leads to a call to `deepIterator`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189

That leads to a call to `CompressionFactory`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95
 which creates a `GZIPInputStream`: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46

That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure 
means that the finalizer on `GZIPInputStream` that deallocates the native 
buffers is never called, because GC is never triggered. Instead, we just 
exhaust the offheap memory and then Kafka dies from an OutOfMemory error.

Kafka *does* trigger an `inputstream.close()` call, but only when *fully* 
reading the whole input stream (see 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156).
 When it's performing log recovery, in 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189
 it doesn't read to the end of the stream, but instead reads the first offset 
and leaves things alone.

This issue likely impacts `lz4` and `snappy` compressed topics in exactly the 
same way. I think (but haven't 100% verified) that it impacts all versions of 
Kafka that are supported (0.8 -> 0.10).

Fixing this seems relatively annoying, but only because of some "small matters 
of coding", nothing hugely problematic.

The main issue is that `deepIterator` only returns an `Iterator`, which doesn't 
have a `close()` method of any kind. We could create a new `ClosableIterator` 
trait and have it extend Java's `AutoCloseable` 
(https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), then 
explicitly call `close()` everywhere we use a `deepIterator()` and don't always 
read to the end. Scala unfortunately doesn't seem to have a built in version of 
Java's `try-with-resources` statement, but we can explicitly call close 
everywhere perfectly happily.

Another (but much more hacky) solution would be to always read to the end of 
the iterator in `LogSegment.recover`, but that seems pretty bad, using far more 
resources than is needed during recovery.

I can't think of any other reasonable solutions for now, but would love to hear 
input from the community.

We're happy doing the work of developing a patch, but thought we'd report the 
issue before starting down that path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts

2016-06-23 Thread Tom Crayford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346463#comment-15346463
 ] 

Tom Crayford commented on KAFKA-3894:
-

(disclaimer: I work with Tim)

It feels like there are a few pieces of work to do here:

1. Expose if the log cleaner state as a JMX metric (like BrokerState)
2. Somehow mark logs we've failed to clean as "busted" somewhere, and stop 
trying to clean them. This way instead of erroring when this occurs the broker 
doesn't stay completely busted, but continues on working on all other partitions
3. I'm unsure, but is it possible to fix the underlying issue by only 
compacting partial segments of the log when the buffer size is smaller than the 
desired offset map? This seems like the hardest but most valuable fix here.

We're happy picking up at least some of these, but would love feedback from the 
community about priorities and ease/appropriateness of these steps (and 
suggestions for other things to have).

> Log Cleaner thread crashes and never restarts
> -
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>  Labels: compaction
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)