Hello, We recently run into problem getting offsets by time for a very low traffic topic. Basically, the traffic is so low that kafka always work on one segment file. I checked the source code and found the following. The last modified time is always current and therefore isFound is false when it exit the loop and it returns no offsets.
============================================================================= package kafka.log import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic._ import java.text.NumberFormat import java.io._ import java.nio.channels.FileChannel import org.apache.log4j._ import kafka.message._ import kafka.utils._ import kafka.common._ import kafka.api.OffsetRequest import java.util._ .... def getOffsetsBefore(request: OffsetRequest): Array[Long] = { val segsArray = segments.view var offsetTimeArray: Array[Tuple2[Long, Long]] = null if (segsArray.last.size > 0) offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1) else offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length) for (i <- 0 until segsArray.length) * offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)* if (segsArray.last.size > 0) offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds) var startIndex = -1 request.time match { case OffsetRequest.LatestTime => startIndex = offsetTimeArray.length - 1 case OffsetRequest.EarliestTime => startIndex = 0 case _ => var isFound = false if(logger.isDebugEnabled) { logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) } startIndex = offsetTimeArray.length - 1 while (startIndex >= 0 && !isFound) { * if (offsetTimeArray(startIndex)._2 <= request.time)* isFound = true else startIndex -=1 } } val retSize = request.maxNumOffsets.min(startIndex + 1) val ret = new Array[Long](retSize) for (j <- 0 until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 } ret }