Hello,

We recently run into problem getting offsets by time for a very low traffic
topic. Basically, the traffic is so low that kafka always work on one
segment file. I checked the source code and found the following. The last
modified time is always current and therefore isFound is false when it exit
the loop and it returns no offsets.


=============================================================================
package kafka.log

import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic._
import java.text.NumberFormat
import java.io._
import java.nio.channels.FileChannel
import org.apache.log4j._
import kafka.message._
import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
....

 def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
    val segsArray = segments.view
    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
    if (segsArray.last.size > 0)
      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
    else
      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)

    for (i <- 0 until segsArray.length)
 *     offsetTimeArray(i) = (segsArray(i).start,
segsArray(i).file.lastModified)*
    if (segsArray.last.size > 0)
      offsetTimeArray(segsArray.length) = (segsArray.last.start +
segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)

    var startIndex = -1
    request.time match {
      case OffsetRequest.LatestTime =>
        startIndex = offsetTimeArray.length - 1
      case OffsetRequest.EarliestTime =>
        startIndex = 0
      case _ =>
          var isFound = false
          if(logger.isDebugEnabled) {
            logger.debug("Offset time array = " + offsetTimeArray.foreach(o
=> "%d, %d".format(o._1, o._2)))
          }
          startIndex = offsetTimeArray.length - 1
          while (startIndex >= 0 && !isFound) {
*            if (offsetTimeArray(startIndex)._2 <= request.time)*
              isFound = true
            else
              startIndex -=1
          }
    }

    val retSize = request.maxNumOffsets.min(startIndex + 1)
    val ret = new Array[Long](retSize)
    for (j <- 0 until retSize) {
      ret(j) = offsetTimeArray(startIndex)._1
      startIndex -= 1
    }
    ret
  }

Reply via email to