[jira] [Created] (KAFKA-5973) ShutdownableThread catching errors can lead to partial hard to diagnose broker failure
Tom Crayford created KAFKA-5973: --- Summary: ShutdownableThread catching errors can lead to partial hard to diagnose broker failure Key: KAFKA-5973 URL: https://issues.apache.org/jira/browse/KAFKA-5973 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.11.0.0, 0.11.0.1 Reporter: Tom Crayford Priority: Minor Fix For: 1.0.0, 0.11.0.2 When any kafka broker {{ShutdownableThread}} subclasses crashes due to an uncaught exception, the broker is left running in a very weird/bad state with some threads not running, but potentially the broker can still be serving traffic to users but not performing its usual operations. This is problematic, because monitoring may say that "the broker is up and fine", but in fact it is not healthy. At Heroku we've been mitigating this by monitoring all threads that "should" be running on a broker and alerting when a given thread isn't running for some reason. Things that use {{ShutdownableThread}} that can crash and leave a broker/the controller in a bad state: - log cleaner - replica fetcher threads - controller to broker send threads - controller topic deletion threads - quota throttling reapers - io threads - network threads - group metadata management threads Some of these can have disasterous consequences, and nearly all of them crashing for any reason is a cause for alert. But, users probably shouldn't have to know about all the internals of Kafka and run thread dumps periodically as part of normal operations. There are a few potential options here: 1. On the crash of any {{ShutdownableThread}}, shutdown the whole broker process We could crash the whole broker when an individual thread dies. I think this is pretty reasonable, it's better to have a very visible breakage than a very hard to detect one. 2. Add some healthcheck JMX bean to detect these thread crashes Users having to audit all of Kafka's source code on each new release and track a list of "threads that should be running" is... pretty silly. We could instead expose a JMX bean of some kind indicating threads that died due to uncaught exceptions 3. Do nothing, but add documentation around monitoring/logging that exposes this error These thread deaths *do* emit log lines, but it's not that clear or obvious to users they need to monitor and alert on them. The project could add documentation -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-4596) KIP-73 rebalance throttling breaks on plans for specific partitions
Tom Crayford created KAFKA-4596: --- Summary: KIP-73 rebalance throttling breaks on plans for specific partitions Key: KAFKA-4596 URL: https://issues.apache.org/jira/browse/KAFKA-4596 Project: Kafka Issue Type: Bug Environment: Kafka 0.10.1.1 Reporter: Tom Crayford The reassign-partitions.sh command fails if you both *throttle* and give it a specific partition reassignment. For example, upon reassigning {code}__consumer_offsets{code} partition 19, you get the following error: {code} Save this to use as the --reassignment-json-file option during rollback Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value. The throttle limit was set to 1048576 B/s Partitions reassignment failed due to key not found: [__consumer_offsets,30] java.util.NoSuchElementException: key not found: [__consumer_offsets,30] at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37 2) at kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37 1) at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.AbstractTraversable.filter(Traversable.scala:104) at kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371) at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347) at kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343) at kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317) at kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387) at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165) at kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149) at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46) at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) {code} This effectively breaks the throttling feature unless you want to rebalance many many partitions at once. For reference the command that was run is: {code} kafka-reassign-partitions.sh --reassignment-json-file 9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829 -4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576' {code} and the contents of the plan file is: {code} {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]} {code} This seems like a simple logic error to me, where we're trying to look up a partition that's not been proposed, when we should not be. It looks like the logic assumes that {code}Map.apply{code} doesn't error if the lookup value isn't found, when in fact it does. I checked that this cluster does indeed have the __consumer_offsets topic populated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4441) Kafka Monitoring is incorrect during rapid topic creation and deletion
Tom Crayford created KAFKA-4441: --- Summary: Kafka Monitoring is incorrect during rapid topic creation and deletion Key: KAFKA-4441 URL: https://issues.apache.org/jira/browse/KAFKA-4441 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.1, 0.10.0.0 Reporter: Tom Crayford Priority: Minor Kafka reports several metrics off the state of partitions: UnderReplicatedPartitions PreferredReplicaImbalanceCount OfflinePartitionsCount All of these metrics trigger when rapidly creating and deleting topics in a tight loop, although the actual causes of the metrics firing are from topics that are undergoing creation/deletion, and the cluster is otherwise stable. Looking through the source code, topic deletion goes through an asynchronous state machine: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L35. However, the metrics do not know about the progress of this state machine: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L185 I believe the fix to this is relatively simple - we need to make the metrics know that a topic is currently undergoing deletion or creation, and only include topics that are "stable" -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
Tom Crayford created KAFKA-4084: --- Summary: automated leader rebalance causes replication downtime for clusters with too many partitions Key: KAFKA-4084 URL: https://issues.apache.org/jira/browse/KAFKA-4084 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.10.0.1, 0.10.0.0, 0.9.0.1, 0.8.2.2, 0.9.0.0 Reporter: Tom Crayford Fix For: 0.10.1.0 If you enable {{auto.leader.rebalance.enable}} (which is on by default), and you have a cluster with many partitions, there is a severe amount of replication downtime following a restart. This causes `UnderReplicatedPartitions` to fire, and replication is paused. This is because the current automated leader rebalance mechanism changes leaders for *all* imbalanced partitions at once, instead of doing it gradually. This effectively stops all replica fetchers in the cluster (assuming there are enough imbalanced partitions), and restarts them. This can take minutes on busy clusters, during which no replication is happening and user data is at risk. Clients with {{acks=-1}} also see issues at this time, because replication is effectively stalled. To quote Todd Palino from the mailing list: bq. There is an admin CLI command to trigger the preferred replica election manually. There is also a broker configuration “auto.leader.rebalance.enable” which you can set to have the broker automatically perform the PLE when needed. DO NOT USE THIS OPTION. There are serious performance issues when doing so, especially on larger clusters. It needs some development work that has not been fully identified yet. This setting is extremely useful for smaller clusters, but with high partition counts causes the huge issues stated above. One potential fix could be adding a new configuration for the number of partitions to do automated leader rebalancing for at once, and *stop* once that number of leader rebalances are in flight, until they're done. There may be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3894) LogCleaner thread crashes if not even one segment can fit in the offset map
[ https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417675#comment-15417675 ] Tom Crayford commented on KAFKA-3894: - Hi Jun, We're probably going to start on b. for now. I think a. is incredibly valuable, but it doesn't impact this manner of the log cleaner crashing. I think there are some cases where we will fail to clean up data, but having those exist seems far more preferable than crashing the thread entirely. We'll get started with b., hopefully will have a patch up within a few business days. > LogCleaner thread crashes if not even one segment can fit in the offset map > --- > > Key: KAFKA-3894 > URL: https://issues.apache.org/jira/browse/KAFKA-3894 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0 > Environment: Oracle JDK 8 > Ubuntu Precise >Reporter: Tim Carey-Smith > Labels: compaction > Fix For: 0.10.1.0 > > > The log-cleaner thread can crash if the number of keys in a topic grows to be > too large to fit into the dedupe buffer. > The result of this is a log line: > {quote} > broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner > \[kafka-log-cleaner-thread-0\], Error due to > java.lang.IllegalArgumentException: requirement failed: 9750860 messages in > segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit > only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease > log.cleaner.threads > {quote} > As a result, the broker is left in a potentially dangerous situation where > cleaning of compacted topics is not running. > It is unclear if the broader strategy for the {{LogCleaner}} is the reason > for this upper bound, or if this is a value which must be tuned for each > specific use-case. > Of more immediate concern is the fact that the thread crash is not visible > via JMX or exposed as some form of service degradation. > Some short-term remediations we have made are: > * increasing the size of the dedupe buffer > * monitoring the log-cleaner threads inside the JVM -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3976) Kafka Controller gets stuck, potentially due to zookeeper session id reuse
Tom Crayford created KAFKA-3976: --- Summary: Kafka Controller gets stuck, potentially due to zookeeper session id reuse Key: KAFKA-3976 URL: https://issues.apache.org/jira/browse/KAFKA-3976 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.0, 0.9.0.1 Reporter: Tom Crayford We've been seeing an issue with a kafka controller getting "stuck" under 0.9.0.1. We carefully monitor {{ActiveControllerCount}} on all brokers, and upon finding it to be 0 for long enough, page an operator. In this particular case (and we've seen similar a few times before), it appears as though there were some weird conditions leading to the cluster not re-electing a controller. This could also potentially be a zookeeper client bug. We've spent a while looking through the code, and it seems like a session gets re-established, but the zookeeper session state callbacks don't seem to get fired on all the listeners properly, which leads to the controller being stuck. Note that with the cluster in this bad state, we've got a znode at /controller and /brokers/ids/0 on this bad node: {code} get /controller {"version":1,"brokerid":0,"timestamp":"1468870986582"} cZxid = 0x2a646 ctime = Mon Jul 18 19:43:06 UTC 2016 mZxid = 0x2a646 mtime = Mon Jul 18 19:43:06 UTC 2016 pZxid = 0x2a646 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x155e49f2b230004 dataLength = 54 numChildren = 0 get /brokers/ids/0 {"jmx_port":-1,"timestamp":"1468870990738","endpoints":["SSL://REDACTED"],"host":null,"version":2,"port":-1} cZxid = 0x2a64e ctime = Mon Jul 18 19:43:10 UTC 2016 mZxid = 0x2a64e mtime = Mon Jul 18 19:43:10 UTC 2016 pZxid = 0x2a64e cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x155e49f2b230004 dataLength = 144 numChildren = 0 {code} However, that broker definitely doesn't have any {{ActiveControllerCount}} (this is a response out of jolokia): {code} {"request":{"mbean":"kafka.controller:name=ActiveControllerCount,type=KafkaController","type":"read"},"value":{"Value":0},"timestamp":1468936463,"status":200} {code} If you look at the attached log files, you can see that *both* brokers went through a brief period where they thought they were the controller, then both resigned and the callbacks meant to fire upon establishment of new sessions did not fire properly, which leads to a "stuck" controller. It's perhaps worth noting that these brokers have *no* more log output at all since this point. They are relatively unused though, so that is not that surprising. Logs are here (this is a two broker cluster): https://gist.github.com/tcrayford/cbf3d5aa1c46194eb7a98786d83b0eab Sensitive data have been redacted, replaced with REDACTED_$TYPE The most interesting stuff happens right at the bottom of each log file, where it appears a zookeeper session is timed out, then re-established. From my understanding of the ZAB protocol, Kafka shouldn't reuse session ids here, but it seems to be. That's potentially the issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts
[ https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15376858#comment-15376858 ] Tom Crayford commented on KAFKA-3894: - Jun: #4 seems potentially very complex to me. It also doesn't work in the case that the broker is shut down and the dedupe buffer size adjusted. I much prefer #3 - it maps fine into the existing model as far as I can tell - we'd "just" split the log file we're cleaning once the offsetmap is full. That of course requires a little more IO, but it doesn't involve implementing (or using a library for) sketches that could potentially be incorrect. It also seems like the right long term solution, and more robust than automatically rolling log files some of the time. Am I missing something here? Upsides of #3 vs #4: We can now clean the largest log segment, no matter the buffer size. We don't increase complexity of the produce path, or change memory usage. We don't have to implement or reuse a library for estimating unique keys We don't have to figure out storing the key estimate (e.g. in the index or in a new file alongside each segment). Downsides: It would increase the complexity of the cleaner. The code that swaps in and out segments will also get more complex, and the crash-safety of that code is already tricky. Exists in both: Larger log segments could potentially be split a lot, and not always deduplicated that well together. For example, if I write the max number of unique keys for the offset map into a topic, then the segment rolls, then I write a tombstone for every message in the previously sent messages, then neither #3 nor #4 would ever clear up any data. This is no worse than today though. Cassandra and other LSM based systems that do log structured storage and over-time compaction use similar "splitting and combining" mechanisms to ensure everything gets cleared up over time without using too much memory. They have a very different storage architecture and goals to Kafka's compaction, for sure, but it's interesting to note that they care about similar things. > Log Cleaner thread crashes and never restarts > - > > Key: KAFKA-3894 > URL: https://issues.apache.org/jira/browse/KAFKA-3894 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.2, 0.9.0.1 > Environment: Oracle JDK 8 > Ubuntu Precise >Reporter: Tim Carey-Smith > Labels: compaction > > The log-cleaner thread can crash if the number of keys in a topic grows to be > too large to fit into the dedupe buffer. > The result of this is a log line: > {quote} > broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner > \[kafka-log-cleaner-thread-0\], Error due to > java.lang.IllegalArgumentException: requirement failed: 9750860 messages in > segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit > only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease > log.cleaner.threads > {quote} > As a result, the broker is left in a potentially dangerous situation where > cleaning of compacted topics is not running. > It is unclear if the broader strategy for the {{LogCleaner}} is the reason > for this upper bound, or if this is a value which must be tuned for each > specific use-case. > Of more immediate concern is the fact that the thread crash is not visible > via JMX or exposed as some form of service degradation. > Some short-term remediations we have made are: > * increasing the size of the dedupe buffer > * monitoring the log-cleaner threads inside the JVM -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot
Tom Crayford created KAFKA-3955: --- Summary: Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot Key: KAFKA-3955 URL: https://issues.apache.org/jira/browse/KAFKA-3955 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2, 0.9.0.0, 0.8.2.1, 0.8.2.0, 0.8.1.1, 0.8.1, 0.8.0 Reporter: Tom Crayford Hi, I've found a bug impacting kafka brokers on startup after an unclean shutdown. If a log segment is corrupt and has non-monotonic offsets (see the appendix of this bug for a sample output from {{DumpLogSegments}}), then {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218 That code is called by {{LogSegment.recover}}: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191 Which is called in several places in {{Log.scala}}. Notably it's called four times during recovery: Thrice in Log.loadSegments https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226 and once in Log.recoverLog https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268 Of these, only the very last one has a {{catch}} for {{InvalidOffsetException}}. When that catches the issue, it truncates the whole log (not just this segment): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274 to the start segment of the bad log segment. However, this code can't be hit on recovery, because of the code paths in {{loadSegments}} - they mean we'll never hit truncation here, as we always throw this exception and that goes all the way to the toplevel exception handler and crashes the JVM. As {{Log.recoverLog}} is always called during recovery, I *think* a fix for this is to move this crash recovery/truncate code inside a new method in {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. That code should return the number of {{truncatedBytes}} like we do in {{Log.recoverLog}} and then truncate the log. The callers will have to be notified "stop iterating over files in the directory", likely via a return value of {{truncatedBytes}} like {{Log.recoverLog` does right now. I'm happy working on a patch for this. I'm aware this recovery code is tricky and important to get right. I'm also curious (and currently don't have good theories as of yet) as to how this log segment got into this state with non-monotonic offsets. This segment is using gzip compression, and is under 0.9.0.1. The same bug with respect to recovery exists in trunk, but I'm unsure if the new handling around compressed messages (KIP-31) means the bug where non-monotonic offsets get appended is still present in trunk. As a production workaround, one can manually truncate that log folder yourself (delete all .index/.log files including and after the one with the bad offset). However, kafka should (and can) handle this case well - with replication we can truncate in broker startup. stacktrace and error message: {code} pri=WARN t=pool-3-thread-4 at=Log Found a corrupted index file, /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding index... pri=ERROR t=main at=LogManager There was an error in one of the threads during logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset (15000337) to position 111719 no larger than the last offset appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index. pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an offset (15000337) to position 111719 no larger than the last offset appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index. at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) at kafka.log.LogSegment.recover(LogSegment.scala:188) at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) at scala.collection.TraversableLike$With... ...Filter$$anonfun$foreach$1.apply(TraversableLike.scala:778) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at
[jira] [Created] (KAFKA-3937) Kafka Clients Leak Native Memory For Longer Than Needed With Compressed Messages
Tom Crayford created KAFKA-3937: --- Summary: Kafka Clients Leak Native Memory For Longer Than Needed With Compressed Messages Key: KAFKA-3937 URL: https://issues.apache.org/jira/browse/KAFKA-3937 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2 Environment: Linux, latest oracle java-8 Reporter: Tom Crayford Priority: Minor In https://issues.apache.org/jira/browse/KAFKA-3933, we discovered that brokers can crash when performing log recovery, as they leak native memory whilst decompressing compressed segments, and that native memory isn't cleaned up rapidly enough by garbage collection and finalizers. The work to fix that in the brokers is taking part in https://github.com/apache/kafka/pull/1598. As part of that PR, Ismael Juma asked me to fix similar issues in the client. Rather than have one large PR that fixes everything, I'd rather break this work up into seperate things, so I'm filing this JIRA to track the followup work. I should get to a PR on this at some point relatively soon, once the other PR has landed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory
[ https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367625#comment-15367625 ] Tom Crayford commented on KAFKA-3933: - Ismael: I've pushed the PR here: https://github.com/apache/kafka/pull/1598. For now, I've only fixed the precise memory issue that has been causing us notable production issues. I'm happy picking up other parts of the codebase where this can happen, but would rather get feedback on the first part of the approach for now (this is my first time contributing code to Kafka). I couldn't see a good or easy way to write any unit tests for this code right now. > Kafka OOM During Log Recovery Due to Leaked Native Memory > - > > Key: KAFKA-3933 > URL: https://issues.apache.org/jira/browse/KAFKA-3933 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0 > Environment: Linux, latest oracle java-8 >Reporter: Tom Crayford >Assignee: Tom Crayford >Priority: Critical > Fix For: 0.10.0.1 > > > Hi there. We've been tracking an issue where Kafka hits an > java.lang.OutOfMemoryError during log recovery. > After a bunch of tracking work, we've realized we've hit an instance of a > long known issue: http://www.evanjones.ca/java-native-leak-bug.html > TLDR: Kafka breaks the rule "Always close GZIPInputStream and > GZIPOutputStream since they use native memory via zlib" from that article. > As such, during broker startup, when you're recovering log segments that have > been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place. > Our crashes during startup have this profile - the JVM heap is empty (a few > hundred MB), but the offheap memory is full of allocations caused by > `Java_java_util_zip_Deflater_init` and `deflatInit2`. > This leads to broker crashes during startup. The only real mitigation is > having *far* more memory than you need to boot (which I'd guess is why folk > haven't noticed this in production that much yet). > To dig into the code more (this is based on trunk). Log recovery on unflushed > segments eventually calls `LogSegment.recover`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172 > On compressed segments, that leads to a call to `deepIterator`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > That leads to a call to `CompressionFactory`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95 > which creates a `GZIPInputStream`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46 > That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure > means that the finalizer on `GZIPInputStream` that deallocates the native > buffers is never called, because GC is never triggered. Instead, we just > exhaust the offheap memory and then Kafka dies from an OutOfMemory error. > Kafka *does* trigger an `inputstream.close()` call, but only when *fully* > reading the whole input stream (see > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156). > When it's performing log recovery, in > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > it doesn't read to the end of the stream, but instead reads the first offset > and leaves things alone. > This issue likely impacts `lz4` and `snappy` compressed topics in exactly the > same way. I think (but haven't 100% verified) that it impacts all versions of > Kafka that are supported (0.8 -> 0.10). > Fixing this seems relatively annoying, but only because of some "small > matters of coding", nothing hugely problematic. > The main issue is that `deepIterator` only returns an `Iterator`, which > doesn't have a `close()` method of any kind. We could create a new > `ClosableIterator` trait and have it extend Java's `AutoCloseable` > (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), > then explicitly call `close()` everywhere we use a `deepIterator()` and don't > always read to the end. Scala unfortunately doesn't seem to have a built in > version of Java's `try-with-resources` statement, but we can explicitly call > close everywhere perfectly happily. > Another (but much more hacky) solution would be to always read to the end of > the iterator in `LogSegment.recover`, but that seems pretty bad, using far > more resources than is needed during recovery. > I can't think of any other reasonable solutions for now, but would love to > hear input from the community. > We're happy doing the work of developing a patch, but thought we'd
[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory
[ https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366721#comment-15366721 ] Tom Crayford commented on KAFKA-3933: - Done. I hope to work on a fix tomorrow. > Kafka OOM During Log Recovery Due to Leaked Native Memory > - > > Key: KAFKA-3933 > URL: https://issues.apache.org/jira/browse/KAFKA-3933 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0 > Environment: Linux, latest oracle java-8 >Reporter: Tom Crayford >Assignee: Tom Crayford >Priority: Critical > Fix For: 0.10.0.1 > > > Hi there. We've been tracking an issue where Kafka hits an > java.lang.OutOfMemoryError during log recovery. > After a bunch of tracking work, we've realized we've hit an instance of a > long known issue: http://www.evanjones.ca/java-native-leak-bug.html > TLDR: Kafka breaks the rule "Always close GZIPInputStream and > GZIPOutputStream since they use native memory via zlib" from that article. > As such, during broker startup, when you're recovering log segments that have > been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place. > Our crashes during startup have this profile - the JVM heap is empty (a few > hundred MB), but the offheap memory is full of allocations caused by > `Java_java_util_zip_Deflater_init` and `deflatInit2`. > This leads to broker crashes during startup. The only real mitigation is > having *far* more memory than you need to boot (which I'd guess is why folk > haven't noticed this in production that much yet). > To dig into the code more (this is based on trunk). Log recovery on unflushed > segments eventually calls `LogSegment.recover`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172 > On compressed segments, that leads to a call to `deepIterator`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > That leads to a call to `CompressionFactory`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95 > which creates a `GZIPInputStream`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46 > That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure > means that the finalizer on `GZIPInputStream` that deallocates the native > buffers is never called, because GC is never triggered. Instead, we just > exhaust the offheap memory and then Kafka dies from an OutOfMemory error. > Kafka *does* trigger an `inputstream.close()` call, but only when *fully* > reading the whole input stream (see > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156). > When it's performing log recovery, in > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > it doesn't read to the end of the stream, but instead reads the first offset > and leaves things alone. > This issue likely impacts `lz4` and `snappy` compressed topics in exactly the > same way. I think (but haven't 100% verified) that it impacts all versions of > Kafka that are supported (0.8 -> 0.10). > Fixing this seems relatively annoying, but only because of some "small > matters of coding", nothing hugely problematic. > The main issue is that `deepIterator` only returns an `Iterator`, which > doesn't have a `close()` method of any kind. We could create a new > `ClosableIterator` trait and have it extend Java's `AutoCloseable` > (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), > then explicitly call `close()` everywhere we use a `deepIterator()` and don't > always read to the end. Scala unfortunately doesn't seem to have a built in > version of Java's `try-with-resources` statement, but we can explicitly call > close everywhere perfectly happily. > Another (but much more hacky) solution would be to always read to the end of > the iterator in `LogSegment.recover`, but that seems pretty bad, using far > more resources than is needed during recovery. > I can't think of any other reasonable solutions for now, but would love to > hear input from the community. > We're happy doing the work of developing a patch, but thought we'd report the > issue before starting down that path. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory
[ https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Crayford reassigned KAFKA-3933: --- Assignee: Tom Crayford > Kafka OOM During Log Recovery Due to Leaked Native Memory > - > > Key: KAFKA-3933 > URL: https://issues.apache.org/jira/browse/KAFKA-3933 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0 > Environment: Linux, latest oracle java-8 >Reporter: Tom Crayford >Assignee: Tom Crayford >Priority: Critical > Fix For: 0.10.0.1 > > > Hi there. We've been tracking an issue where Kafka hits an > java.lang.OutOfMemoryError during log recovery. > After a bunch of tracking work, we've realized we've hit an instance of a > long known issue: http://www.evanjones.ca/java-native-leak-bug.html > TLDR: Kafka breaks the rule "Always close GZIPInputStream and > GZIPOutputStream since they use native memory via zlib" from that article. > As such, during broker startup, when you're recovering log segments that have > been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place. > Our crashes during startup have this profile - the JVM heap is empty (a few > hundred MB), but the offheap memory is full of allocations caused by > `Java_java_util_zip_Deflater_init` and `deflatInit2`. > This leads to broker crashes during startup. The only real mitigation is > having *far* more memory than you need to boot (which I'd guess is why folk > haven't noticed this in production that much yet). > To dig into the code more (this is based on trunk). Log recovery on unflushed > segments eventually calls `LogSegment.recover`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172 > On compressed segments, that leads to a call to `deepIterator`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > That leads to a call to `CompressionFactory`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95 > which creates a `GZIPInputStream`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46 > That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure > means that the finalizer on `GZIPInputStream` that deallocates the native > buffers is never called, because GC is never triggered. Instead, we just > exhaust the offheap memory and then Kafka dies from an OutOfMemory error. > Kafka *does* trigger an `inputstream.close()` call, but only when *fully* > reading the whole input stream (see > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156). > When it's performing log recovery, in > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > it doesn't read to the end of the stream, but instead reads the first offset > and leaves things alone. > This issue likely impacts `lz4` and `snappy` compressed topics in exactly the > same way. I think (but haven't 100% verified) that it impacts all versions of > Kafka that are supported (0.8 -> 0.10). > Fixing this seems relatively annoying, but only because of some "small > matters of coding", nothing hugely problematic. > The main issue is that `deepIterator` only returns an `Iterator`, which > doesn't have a `close()` method of any kind. We could create a new > `ClosableIterator` trait and have it extend Java's `AutoCloseable` > (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), > then explicitly call `close()` everywhere we use a `deepIterator()` and don't > always read to the end. Scala unfortunately doesn't seem to have a built in > version of Java's `try-with-resources` statement, but we can explicitly call > close everywhere perfectly happily. > Another (but much more hacky) solution would be to always read to the end of > the iterator in `LogSegment.recover`, but that seems pretty bad, using far > more resources than is needed during recovery. > I can't think of any other reasonable solutions for now, but would love to > hear input from the community. > We're happy doing the work of developing a patch, but thought we'd report the > issue before starting down that path. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory
[ https://issues.apache.org/jira/browse/KAFKA-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15366329#comment-15366329 ] Tom Crayford commented on KAFKA-3933: - That makes sense to me. Thanks for the pointer. > Kafka OOM During Log Recovery Due to Leaked Native Memory > - > > Key: KAFKA-3933 > URL: https://issues.apache.org/jira/browse/KAFKA-3933 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0 > Environment: Linux, latest oracle java-8 >Reporter: Tom Crayford >Priority: Critical > Fix For: 0.10.0.1 > > > Hi there. We've been tracking an issue where Kafka hits an > java.lang.OutOfMemoryError during log recovery. > After a bunch of tracking work, we've realized we've hit an instance of a > long known issue: http://www.evanjones.ca/java-native-leak-bug.html > TLDR: Kafka breaks the rule "Always close GZIPInputStream and > GZIPOutputStream since they use native memory via zlib" from that article. > As such, during broker startup, when you're recovering log segments that have > been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place. > Our crashes during startup have this profile - the JVM heap is empty (a few > hundred MB), but the offheap memory is full of allocations caused by > `Java_java_util_zip_Deflater_init` and `deflatInit2`. > This leads to broker crashes during startup. The only real mitigation is > having *far* more memory than you need to boot (which I'd guess is why folk > haven't noticed this in production that much yet). > To dig into the code more (this is based on trunk). Log recovery on unflushed > segments eventually calls `LogSegment.recover`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172 > On compressed segments, that leads to a call to `deepIterator`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > That leads to a call to `CompressionFactory`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95 > which creates a `GZIPInputStream`: > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46 > That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure > means that the finalizer on `GZIPInputStream` that deallocates the native > buffers is never called, because GC is never triggered. Instead, we just > exhaust the offheap memory and then Kafka dies from an OutOfMemory error. > Kafka *does* trigger an `inputstream.close()` call, but only when *fully* > reading the whole input stream (see > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156). > When it's performing log recovery, in > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 > it doesn't read to the end of the stream, but instead reads the first offset > and leaves things alone. > This issue likely impacts `lz4` and `snappy` compressed topics in exactly the > same way. I think (but haven't 100% verified) that it impacts all versions of > Kafka that are supported (0.8 -> 0.10). > Fixing this seems relatively annoying, but only because of some "small > matters of coding", nothing hugely problematic. > The main issue is that `deepIterator` only returns an `Iterator`, which > doesn't have a `close()` method of any kind. We could create a new > `ClosableIterator` trait and have it extend Java's `AutoCloseable` > (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), > then explicitly call `close()` everywhere we use a `deepIterator()` and don't > always read to the end. Scala unfortunately doesn't seem to have a built in > version of Java's `try-with-resources` statement, but we can explicitly call > close everywhere perfectly happily. > Another (but much more hacky) solution would be to always read to the end of > the iterator in `LogSegment.recover`, but that seems pretty bad, using far > more resources than is needed during recovery. > I can't think of any other reasonable solutions for now, but would love to > hear input from the community. > We're happy doing the work of developing a patch, but thought we'd report the > issue before starting down that path. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3933) Kafka OOM During Log Recovery Due to Leaked Native Memory
Tom Crayford created KAFKA-3933: --- Summary: Kafka OOM During Log Recovery Due to Leaked Native Memory Key: KAFKA-3933 URL: https://issues.apache.org/jira/browse/KAFKA-3933 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.10.0.0, 0.9.0.1, 0.8.2.2 Environment: Linux, latest oracle java-8 Reporter: Tom Crayford Hi there. We've been tracking an issue where Kafka hits an java.lang.OutOfMemoryError during log recovery. After a bunch of tracking work, we've realized we've hit an instance of a long known issue: http://www.evanjones.ca/java-native-leak-bug.html TLDR: Kafka breaks the rule "Always close GZIPInputStream and GZIPOutputStream since they use native memory via zlib" from that article. As such, during broker startup, when you're recovering log segments that have been compressed with gzip, Kafka leaks `GZIPInputStream` all over the place. Our crashes during startup have this profile - the JVM heap is empty (a few hundred MB), but the offheap memory is full of allocations caused by `Java_java_util_zip_Deflater_init` and `deflatInit2`. This leads to broker crashes during startup. The only real mitigation is having *far* more memory than you need to boot (which I'd guess is why folk haven't noticed this in production that much yet). To dig into the code more (this is based on trunk). Log recovery on unflushed segments eventually calls `LogSegment.recover`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L172 On compressed segments, that leads to a call to `deepIterator`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 That leads to a call to `CompressionFactory`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L95 which creates a `GZIPInputStream`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/CompressionFactory.scala#L46 That `GZIPInputStream` is never closed anywhere, and the low *heap* pressure means that the finalizer on `GZIPInputStream` that deallocates the native buffers is never called, because GC is never triggered. Instead, we just exhaust the offheap memory and then Kafka dies from an OutOfMemory error. Kafka *does* trigger an `inputstream.close()` call, but only when *fully* reading the whole input stream (see https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L156). When it's performing log recovery, in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L189 it doesn't read to the end of the stream, but instead reads the first offset and leaves things alone. This issue likely impacts `lz4` and `snappy` compressed topics in exactly the same way. I think (but haven't 100% verified) that it impacts all versions of Kafka that are supported (0.8 -> 0.10). Fixing this seems relatively annoying, but only because of some "small matters of coding", nothing hugely problematic. The main issue is that `deepIterator` only returns an `Iterator`, which doesn't have a `close()` method of any kind. We could create a new `ClosableIterator` trait and have it extend Java's `AutoCloseable` (https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html), then explicitly call `close()` everywhere we use a `deepIterator()` and don't always read to the end. Scala unfortunately doesn't seem to have a built in version of Java's `try-with-resources` statement, but we can explicitly call close everywhere perfectly happily. Another (but much more hacky) solution would be to always read to the end of the iterator in `LogSegment.recover`, but that seems pretty bad, using far more resources than is needed during recovery. I can't think of any other reasonable solutions for now, but would love to hear input from the community. We're happy doing the work of developing a patch, but thought we'd report the issue before starting down that path. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts
[ https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346463#comment-15346463 ] Tom Crayford commented on KAFKA-3894: - (disclaimer: I work with Tim) It feels like there are a few pieces of work to do here: 1. Expose if the log cleaner state as a JMX metric (like BrokerState) 2. Somehow mark logs we've failed to clean as "busted" somewhere, and stop trying to clean them. This way instead of erroring when this occurs the broker doesn't stay completely busted, but continues on working on all other partitions 3. I'm unsure, but is it possible to fix the underlying issue by only compacting partial segments of the log when the buffer size is smaller than the desired offset map? This seems like the hardest but most valuable fix here. We're happy picking up at least some of these, but would love feedback from the community about priorities and ease/appropriateness of these steps (and suggestions for other things to have). > Log Cleaner thread crashes and never restarts > - > > Key: KAFKA-3894 > URL: https://issues.apache.org/jira/browse/KAFKA-3894 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.2, 0.9.0.1 > Environment: Oracle JDK 8 > Ubuntu Precise >Reporter: Tim Carey-Smith > Labels: compaction > > The log-cleaner thread can crash if the number of keys in a topic grows to be > too large to fit into the dedupe buffer. > The result of this is a log line: > {quote} > broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner > \[kafka-log-cleaner-thread-0\], Error due to > java.lang.IllegalArgumentException: requirement failed: 9750860 messages in > segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit > only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease > log.cleaner.threads > {quote} > As a result, the broker is left in a potentially dangerous situation where > cleaning of compacted topics is not running. > It is unclear if the broader strategy for the {{LogCleaner}} is the reason > for this upper bound, or if this is a value which must be tuned for each > specific use-case. > Of more immediate concern is the fact that the thread crash is not visible > via JMX or exposed as some form of service degradation. > Some short-term remediations we have made are: > * increasing the size of the dedupe buffer > * monitoring the log-cleaner threads inside the JVM -- This message was sent by Atlassian JIRA (v6.3.4#6332)