Hello,
We recently run into problem getting offsets by time for a very low traffic
topic. Basically, the traffic is so low that kafka always work on one
segment file. I checked the source code and found the following. The last
modified time is always current and therefore isFound is false when it exit
the loop and it returns no offsets.
=============================================================================
package kafka.log
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic._
import java.text.NumberFormat
import java.io._
import java.nio.channels.FileChannel
import org.apache.log4j._
import kafka.message._
import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
....
def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
val segsArray = segments.view
var offsetTimeArray: Array[Tuple2[Long, Long]] = null
if (segsArray.last.size > 0)
offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
else
offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
for (i <- 0 until segsArray.length)
* offsetTimeArray(i) = (segsArray(i).start,
segsArray(i).file.lastModified)*
if (segsArray.last.size > 0)
offsetTimeArray(segsArray.length) = (segsArray.last.start +
segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
var startIndex = -1
request.time match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
if(logger.isDebugEnabled) {
logger.debug("Offset time array = " + offsetTimeArray.foreach(o
=> "%d, %d".format(o._1, o._2)))
}
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
* if (offsetTimeArray(startIndex)._2 <= request.time)*
isFound = true
else
startIndex -=1
}
}
val retSize = request.maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for (j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
ret
}