Hi team,
I'm leaning the source code recently,the method logic seems wants to collect
all timeout request, but it acutually just remove the first expire request of
each broker, is it a deliberate design? will it lose some callbacks ? Looking
forward to your reply!
Source file: kafka.common.InterBrokerSendThread.scala
def removeAllTimedOut(now: Long): Collection[ClientRequest] = {
val expiredRequests = new ArrayList[ClientRequest]
for (requests <- unsent.values.asScala) {
val requestIterator = requests.iterator
var foundExpiredRequest = false
while (requestIterator.hasNext && !foundExpiredRequest) {
val request = requestIterator.next
val elapsedMs = Math.max(0, now - request.createdTimeMs)
if (elapsedMs > request.requestTimeoutMs) {
expiredRequests.add(request)
requestIterator.remove()
foundExpiredRequest = true
}
}
}
expiredRequests
}
Best
tiegen