Repository: kafka Updated Branches: refs/heads/trunk 7a0821d65 -> c46cc4802
KAFKA-2236; Offset request reply racing with segment rolling Author: William Thurston <[email protected]> Author: Ismael Juma <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #1318 from ijuma/KAFKA-2236-offset-request-reply-segment-rolling-race Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c46cc480 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c46cc480 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c46cc480 Branch: refs/heads/trunk Commit: c46cc480214080844ef0ca04d96f1db61b1f2ea3 Parents: 7a0821d Author: William Thurston <[email protected]> Authored: Wed May 4 14:26:30 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 4 14:26:30 2016 -0700 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 9 ++++---- .../scala/unit/kafka/server/LogOffsetTest.scala | 22 +++++++++++++++++++- 2 files changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c46cc480/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cf7814e..eb6358d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -579,17 +579,18 @@ class KafkaApis(val requestChannel: RequestChannel, } } - private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + private[server] def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null - if (segsArray.last.size > 0) + val lastSegmentHasSize = segsArray.last.size > 0 + if (lastSegmentHasSize) offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1) else offsetTimeArray = new Array[(Long, Long)](segsArray.length) for (i <- 0 until segsArray.length) offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) - if (segsArray.last.size > 0) + if (lastSegmentHasSize) offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) var startIndex = -1 @@ -1048,4 +1049,4 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c46cc480/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index d5c696e..463cd8a 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -18,12 +18,14 @@ package kafka.server import java.io.File +import java.util.concurrent.atomic.AtomicLong import java.util.{Properties, Random} import kafka.admin.AdminUtils import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo} import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer +import kafka.log.{Log, LogSegment} import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec} import kafka.utils.TestUtils._ import kafka.utils._ @@ -31,11 +33,12 @@ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Utils +import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ import org.junit.{After, Before, Test} class LogOffsetTest extends ZooKeeperTestHarness { - val random = new Random() + val random = new Random() var logDir: File = null var topicLogDir: File = null var server: KafkaServer = null @@ -194,6 +197,23 @@ class LogOffsetTest extends ZooKeeperTestHarness { assertEquals(Seq(0L), consumerOffsets) } + /* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size` changes after each invocation (simulating + * a race condition) */ + @Test + def testFetchOffsetsBeforeWithChangingSegmentSize() { + val log = EasyMock.niceMock(classOf[Log]) + val logSegment = EasyMock.niceMock(classOf[LogSegment]) + EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Long] { + private val value = new AtomicLong(0) + def answer: Long = value.getAndIncrement() + }) + EasyMock.replay(logSegment) + val logSegments = Seq(logSegment) + EasyMock.expect(log.logSegments).andStubReturn(logSegments) + EasyMock.replay(log) + server.apis.fetchOffsetsBefore(log, System.currentTimeMillis, 100) + } + private def createBrokerConfig(nodeId: Int): Properties = { val props = new Properties props.put("broker.id", nodeId.toString)
