Repository: kafka Updated Branches: refs/heads/0.10.1 5908a2523 -> c4766089f
KAFKA-4205; KafkaApis: fix NPE caused by conversion to array NPE was caused by `log.logSegments.toArray` resulting in array containing `null` values. The exact reason still remains somewhat a mystery to me, but it seems that the culprit is `JavaConverters` in combination with concurrent data structure access. Here's a simple code example to prove that: ```scala import java.util.concurrent.ConcurrentSkipListMap // Same as `JavaConversions`, but allows explicit conversions via `asScala`/`asJava` methods. import scala.collection.JavaConverters._ case object Value val m = new ConcurrentSkipListMap[Int, Value.type] new Thread { override def run() = { while (true) m.put(9000, Value) } }.start() new Thread { override def run() = { while (true) m.remove(9000) } }.start() new Thread { override def run() = { while (true) { println(m.values.asScala.toArray.headOption) } } }.start() ``` Running the example will occasionally print `Some(null)` indicating that there's something shady going on during `toArray` conversion. `null`s magically disappear by making the following change: ```diff - println(m.values.asScala.toArray.headOption) + println(m.values.asScala.toSeq.headOption) ``` Author: Anton Karamanov <atara...@yandex-team.ru> Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com> Closes #2204 from ataraxer/KAFKA-4205 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4766089 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4766089 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4766089 Branch: refs/heads/0.10.1 Commit: c4766089f75c0e3627dbd3d596256622d0c1beb7 Parents: 5908a25 Author: Anton Karamanov <atara...@yandex-team.ru> Authored: Sun Dec 4 10:51:16 2016 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Sun Dec 4 10:59:02 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 12 +++++----- .../scala/kafka/log/LogCleanerManager.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 23 +++++++++++--------- .../scala/unit/kafka/server/LogOffsetTest.scala | 19 ++++++++++++++++ 4 files changed, 40 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f29cde7..aea1a08 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -598,19 +598,21 @@ class Log(val dir: File, s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " + s"required version $KAFKA_0_10_0_IV0") + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer // For the earliest and latest, we do not need to return the timestamp. - val segsArray = logSegments.toArray if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) - return Some(TimestampOffset(Message.NoTimestamp, segsArray(0).baseOffset)) + return Some(TimestampOffset(Message.NoTimestamp, segmentsCopy.head.baseOffset)) else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) return Some(TimestampOffset(Message.NoTimestamp, logEndOffset)) val targetSeg = { // Get all the segments whose largest timestamp is smaller than target timestamp - val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp) + val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. - if (earlierSegs.length < segsArray.length) - Some(segsArray(earlierSegs.length)) + if (earlierSegs.length < segmentsCopy.length) + Some(segmentsCopy(earlierSegs.length)) else None } http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index b3e6e72..0cfe6c3 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -290,7 +290,7 @@ private[log] object LogCleanerManager extends Logging { } // dirty log segments - val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray + val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset) val compactionLagMs = math.max(log.config.compactionLagMs, 0L) http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b664ea7..197ddb5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -714,18 +714,21 @@ class KafkaApis(val requestChannel: RequestChannel, } private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { - val segsArray = log.logSegments.toArray - var offsetTimeArray: Array[(Long, Long)] = null - val lastSegmentHasSize = segsArray.last.size > 0 - if (lastSegmentHasSize) - offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) - else - offsetTimeArray = new Array[(Long, Long)](segsArray.length) + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segments = log.logSegments.toBuffer + val lastSegmentHasSize = segments.last.size > 0 + + val offsetTimeArray = + if (lastSegmentHasSize) + new Array[(Long, Long)](segments.length + 1) + else + new Array[(Long, Long)](segments.length) - for (i <- 0 until segsArray.length) - offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) + for (i <- segments.indices) + offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified) if (lastSegmentHasSize) - offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) + offsetTimeArray(segments.length) = (log.logEndOffset, SystemTime.milliseconds) var startIndex = -1 timestamp match { http://git-wip-us.apache.org/repos/asf/kafka/blob/c4766089/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 0885709..d98b82c 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -214,6 +214,25 @@ class LogOffsetTest extends ZooKeeperTestHarness { server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100) } + /* We test that `fetchOffsetsBefore` works correctly if `Log.logSegments` content and size are + * different (simulating a race condition) */ + @Test + def testFetchOffsetsBeforeWithChangingSegments() { + val log = EasyMock.niceMock(classOf[Log]) + val logSegment = EasyMock.niceMock(classOf[LogSegment]) + EasyMock.expect(log.logSegments).andStubAnswer { + new IAnswer[Iterable[LogSegment]] { + def answer = new Iterable[LogSegment] { + override def size = 2 + def iterator = Seq(logSegment).iterator + } + } + } + EasyMock.replay(logSegment) + EasyMock.replay(log) + server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100) + } + private def createBrokerConfig(nodeId: Int): Properties = { val props = new Properties props.put("broker.id", nodeId.toString)