Gian Merlino created KAFKA-2118:
-----------------------------------

             Summary: Cleaner cannot clean after shutdown during replaceSegments
                 Key: KAFKA-2118
                 URL: https://issues.apache.org/jira/browse/KAFKA-2118
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8.2.0
            Reporter: Gian Merlino


If a broker shuts down after the cleaner calls replaceSegments with more than 
one segment, the partition can be left in an uncleanable state. We saw this on 
a few brokers after doing a rolling update. The sequence of things we saw is:

1) Cleaner cleaned segments with base offsets 0, 1094621529, and 1094831997 
into a new segment 0.
2) Cleaner logged "Swapping in cleaned segment 0 for segment(s) 
0,1094621529,1094831997 in log xxx-15." and called replaceSegments.
3) 0.cleaned was renamed to 0.swap.
4) Broker shut down before deleting segments 1094621529 and 1094831997.
5) Broker started up and logged "Found log file 
/mnt/persistent/kafka-logs/xxx-15/00000000000000000000.log.swap from 
interrupted swap operation, repairing."
6) Cleaner thread died with the exception "kafka.common.InvalidOffsetException: 
Attempt to append an offset (1094911424) to position 1003 no larger than the 
last offset appended (1095045873) to 
/mnt/persistent/kafka-logs/xxx-15/00000000000000000000.index.cleaned."

I think what's happening in #6 is that when the broker started back up and 
repaired the log, segment 0 ended up with a bunch of messages that were also in 
segment 1094621529 and 1094831997 (because the new segment 0 was created from 
cleaning all 3). But segments 1094621529 and 1094831997 were still on disk, so 
offsets on disk were no longer monotonically increasing, violating the 
assumption of OffsetIndex. We ended up fixing this by deleting segments 
1094621529 and 1094831997 manually, and then removing the line for this 
partition from the cleaner-offset-checkpoint file (otherwise it would reference 
the non-existent segment 1094621529).

This can happen even on a clean shutdown (the async deletes in replaceSegments 
might not happen).

Cleaner logs post-startup:
2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Beginning cleaning of log xxx-15.
2015-04-12 15:07:56,533 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Building offset map for xxx-15...
2015-04-12 15:07:56,595 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Building offset map for log xxx-15 for 6 segments in offset range 
[1094621529, 1095924157).
2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Offset map for log xxx-15 complete.
2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Cleaning log xxx-15 (discarding tombstones prior to Sun Apr 12 
14:05:37 UTC 2015)...
2015-04-12 15:08:01,443 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Cleaning segment 0 in log xxx-15 (last modified Sun Apr 12 
14:05:38 UTC 2015) into 0, retaining deletes.
2015-04-12 15:08:04,283 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Cleaning segment 1094621529 in log xxx-15 (last modified Sun Apr 
12 13:49:27 UTC 2015) into 0, discarding deletes.
2015-04-12 15:08:05,079 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- Cleaner 0: Cleaning segment 1094831997 in log xxx-15 (last modified Sun Apr 
12 14:04:28 UTC 2015) into 0, discarding deletes.
2015-04-12 15:08:05,157 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- [kafka-log-cleaner-thread-0], Error due to
kafka.common.InvalidOffsetException: Attempt to append an offset (1094911424) 
to position 1003 no larger than the last offset appended (1095045873) to 
/mnt/persistent/kafka-logs/xxx-15/00000000000000000000.index.
cleaned.
at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.append(LogSegment.scala:81)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:427)
at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:358)
at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:354)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:354)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:321)
at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:320)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.log.Cleaner.clean(LogCleaner.scala:320)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
2015-04-12 15:08:05,157 INFO [kafka-log-cleaner-thread-0] kafka.log.LogCleaner 
- [kafka-log-cleaner-thread-0], Stopped



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to