Repository: kafka
Updated Branches:
  refs/heads/trunk e43bbce49 -> 2d19ad4bb


KAFKA-4205; KafkaApis: fix NPE caused by conversion to array

NPE was caused by `log.logSegments.toArray` resulting in array containing 
`null` values. The exact reason still remains somewhat a mystery to me, but it 
seems that the culprit is `JavaConverters` in combination with concurrent data 
structure access.

Here's a simple code example to prove that:
```scala
import java.util.concurrent.ConcurrentSkipListMap
// Same as `JavaConversions`, but allows explicit conversions via 
`asScala`/`asJava` methods.
import scala.collection.JavaConverters._

case object Value
val m = new ConcurrentSkipListMap[Int, Value.type]
new Thread { override def run() = { while (true) m.put(9000, Value) } }.start()
new Thread { override def run() = { while (true) m.remove(9000) } }.start()
new Thread { override def run() = { while (true) { 
println(m.values.asScala.toArray.headOption) } } }.start()
```

Running the example will occasionally print `Some(null)` indicating that 
there's something shady going on during `toArray` conversion.

`null`s magically disappear by making the following change:
```diff
- println(m.values.asScala.toArray.headOption)
+ println(m.values.asScala.toSeq.headOption)
```

Author: Anton Karamanov <atara...@yandex-team.ru>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>

Closes #2204 from ataraxer/KAFKA-4205


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d19ad4b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d19ad4b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d19ad4b

Branch: refs/heads/trunk
Commit: 2d19ad4bb07b4f10f2a314e86bbf4171cdcd7765
Parents: e43bbce
Author: Anton Karamanov <atara...@yandex-team.ru>
Authored: Sun Dec 4 10:51:16 2016 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Sun Dec 4 10:51:16 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 12 +++++-----
 .../scala/kafka/log/LogCleanerManager.scala     |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 23 +++++++++++---------
 .../scala/unit/kafka/server/LogOffsetTest.scala | 19 ++++++++++++++++
 4 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d19ad4b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 122f8be..6acc8d2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -605,19 +605,21 @@ class Log(@volatile var dir: File,
           s"for partition $topicAndPartition is ${config.messageFormatVersion} 
which is earlier than the minimum " +
           s"required version $KAFKA_0_10_0_IV0")
 
+    // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+    // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+    val segmentsCopy = logSegments.toBuffer
     // For the earliest and latest, we do not need to return the timestamp.
-    val segsArray = logSegments.toArray
     if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(Message.NoTimestamp, 
segsArray(0).baseOffset))
+        return Some(TimestampOffset(Message.NoTimestamp, 
segmentsCopy.head.baseOffset))
     else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
         return Some(TimestampOffset(Message.NoTimestamp, logEndOffset))
 
     val targetSeg = {
       // Get all the segments whose largest timestamp is smaller than target 
timestamp
-      val earlierSegs = segsArray.takeWhile(_.largestTimestamp < 
targetTimestamp)
+      val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < 
targetTimestamp)
       // We need to search the first segment whose largest timestamp is 
greater than the target timestamp if there is one.
-      if (earlierSegs.length < segsArray.length)
-        Some(segsArray(earlierSegs.length))
+      if (earlierSegs.length < segmentsCopy.length)
+        Some(segmentsCopy(earlierSegs.length))
       else
         None
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d19ad4b/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 92cbf0f..681042e 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -289,7 +289,7 @@ private[log] object LogCleanerManager extends Logging {
     }
 
     // dirty log segments
-    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset).toArray
+    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, 
log.activeSegment.baseOffset)
 
     val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d19ad4b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index fa3db5c..447e4e2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -717,18 +717,21 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, 
maxNumOffsets: Int): Seq[Long] = {
-    val segsArray = log.logSegments.toArray
-    var offsetTimeArray: Array[(Long, Long)] = null
-    val lastSegmentHasSize = segsArray.last.size > 0
-    if (lastSegmentHasSize)
-      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
-    else
-      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
+    // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+    // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+    val segments = log.logSegments.toBuffer
+    val lastSegmentHasSize = segments.last.size > 0
+
+    val offsetTimeArray =
+      if (lastSegmentHasSize)
+        new Array[(Long, Long)](segments.length + 1)
+      else
+        new Array[(Long, Long)](segments.length)
 
-    for (i <- segsArray.indices)
-      offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
+    for (i <- segments.indices)
+      offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
     if (lastSegmentHasSize)
-      offsetTimeArray(segsArray.length) = (log.logEndOffset, time.milliseconds)
+      offsetTimeArray(segments.length) = (log.logEndOffset, time.milliseconds)
 
     var startIndex = -1
     timestamp match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d19ad4b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 42e9be1..70445d7 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -214,6 +214,25 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
   }
 
+  /* We test that `fetchOffsetsBefore` works correctly if `Log.logSegments` 
content and size are
+   * different (simulating a race condition) */
+  @Test
+  def testFetchOffsetsBeforeWithChangingSegments() {
+    val log = EasyMock.niceMock(classOf[Log])
+    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    EasyMock.expect(log.logSegments).andStubAnswer {
+      new IAnswer[Iterable[LogSegment]] {
+        def answer = new Iterable[LogSegment] {
+          override def size = 2
+          def iterator = Seq(logSegment).iterator
+        }
+      }
+    }
+    EasyMock.replay(logSegment)
+    EasyMock.replay(log)
+    server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100)
+  }
+
   private def createBrokerConfig(nodeId: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)

Reply via email to