Xiaoyu, Yes, this is because we rely on the last modified time as a rough estimate of the offset time, which is not very accurate. In this case, can you fall back to the earliest offset?
Thanks, Jun On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xiaoyu.w...@gmail.com> wrote: > Hello, > > We recently run into problem getting offsets by time for a very low traffic > topic. Basically, the traffic is so low that kafka always work on one > segment file. I checked the source code and found the following. The last > modified time is always current and therefore isFound is false when it exit > the loop and it returns no offsets. > > > > ============================================================================= > package kafka.log > > import java.util.concurrent.CopyOnWriteArrayList > import java.util.concurrent.atomic._ > import java.text.NumberFormat > import java.io._ > import java.nio.channels.FileChannel > import org.apache.log4j._ > import kafka.message._ > import kafka.utils._ > import kafka.common._ > import kafka.api.OffsetRequest > import java.util._ > .... > > def getOffsetsBefore(request: OffsetRequest): Array[Long] = { > val segsArray = segments.view > var offsetTimeArray: Array[Tuple2[Long, Long]] = null > if (segsArray.last.size > 0) > offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1) > else > offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length) > > for (i <- 0 until segsArray.length) > * offsetTimeArray(i) = (segsArray(i).start, > segsArray(i).file.lastModified)* > if (segsArray.last.size > 0) > offsetTimeArray(segsArray.length) = (segsArray.last.start + > segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds) > > var startIndex = -1 > request.time match { > case OffsetRequest.LatestTime => > startIndex = offsetTimeArray.length - 1 > case OffsetRequest.EarliestTime => > startIndex = 0 > case _ => > var isFound = false > if(logger.isDebugEnabled) { > logger.debug("Offset time array = " + offsetTimeArray.foreach(o > => "%d, %d".format(o._1, o._2))) > } > startIndex = offsetTimeArray.length - 1 > while (startIndex >= 0 && !isFound) { > * if (offsetTimeArray(startIndex)._2 <= request.time)* > isFound = true > else > startIndex -=1 > } > } > > val retSize = request.maxNumOffsets.min(startIndex + 1) > val ret = new Array[Long](retSize) > for (j <- 0 until retSize) { > ret(j) = offsetTimeArray(startIndex)._1 > startIndex -= 1 > } > ret > } >