Xiaoyu,

Yes, this is because we rely on the last modified time as a rough estimate
of the offset time, which is not very accurate. In this case, can you fall
back to the earliest offset?

Thanks,

Jun

On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xiaoyu.w...@gmail.com> wrote:

> Hello,
>
> We recently run into problem getting offsets by time for a very low traffic
> topic. Basically, the traffic is so low that kafka always work on one
> segment file. I checked the source code and found the following. The last
> modified time is always current and therefore isFound is false when it exit
> the loop and it returns no offsets.
>
>
>
> =============================================================================
> package kafka.log
>
> import java.util.concurrent.CopyOnWriteArrayList
> import java.util.concurrent.atomic._
> import java.text.NumberFormat
> import java.io._
> import java.nio.channels.FileChannel
> import org.apache.log4j._
> import kafka.message._
> import kafka.utils._
> import kafka.common._
> import kafka.api.OffsetRequest
> import java.util._
> ....
>
>  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
>     val segsArray = segments.view
>     var offsetTimeArray: Array[Tuple2[Long, Long]] = null
>     if (segsArray.last.size > 0)
>       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
>     else
>       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
>
>     for (i <- 0 until segsArray.length)
>  *     offsetTimeArray(i) = (segsArray(i).start,
> segsArray(i).file.lastModified)*
>     if (segsArray.last.size > 0)
>       offsetTimeArray(segsArray.length) = (segsArray.last.start +
> segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
>
>     var startIndex = -1
>     request.time match {
>       case OffsetRequest.LatestTime =>
>         startIndex = offsetTimeArray.length - 1
>       case OffsetRequest.EarliestTime =>
>         startIndex = 0
>       case _ =>
>           var isFound = false
>           if(logger.isDebugEnabled) {
>             logger.debug("Offset time array = " + offsetTimeArray.foreach(o
> => "%d, %d".format(o._1, o._2)))
>           }
>           startIndex = offsetTimeArray.length - 1
>           while (startIndex >= 0 && !isFound) {
> *            if (offsetTimeArray(startIndex)._2 <= request.time)*
>               isFound = true
>             else
>               startIndex -=1
>           }
>     }
>
>     val retSize = request.maxNumOffsets.min(startIndex + 1)
>     val ret = new Array[Long](retSize)
>     for (j <- 0 until retSize) {
>       ret(j) = offsetTimeArray(startIndex)._1
>       startIndex -= 1
>     }
>     ret
>   }
>

Reply via email to