[
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14278895#comment-14278895
]
Sriharsha Chintalapani edited comment on KAFKA-1461 at 1/15/15 4:24 PM:
[~guozhang] I had the following code in my mind about backoff retries incase of
any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
private val partitionsWithErrorStandbyMap = new
mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition,
Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back
to the map once the currentTime > partitionsWithErrorMap.timestamp +
replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you ,
I'll send a patch or if you have any other approach please let me know.
{code}
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
//add to the partitionsWithErrorMap with currentTime.
for (partition <- partitions) {
if(!partitionsWithErrorMap.contains(partition)) {
partitionsWithErrorMap.put(partition, System.currentTimeMillis())
currentOffset(partition) match {
case Some(offset: Long) =>
partitionsWithErrorStandbyMap.put(partition, offset)
}
}
}
removePartitions(partitions.toSet)
val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
// process partitionsWithErrorMap and add partitions back if the backoff
time elapsed.
partitionsWithErrorMap.foreach {
case((topicAndPartition, timeMs)) =>
if(System.currentTimeMillis() > timeMs +
brokerConfig.replicaFetcherRetryBackoffMs) {
partitionsWithErrorStandbyMap.get(topicAndPartition) match {
case Some(offset: Long) =>
partitionsToBeAdded.put(topicAndPartition, offset)
}
partitionsWithErrorStandbyMap.remove(topicAndPartition)
}
}
addPartitions(partitionsToBeAdded)
}
{code}
was (Author: sriharsha):
[~guozhang] I had the following code in my mind about backoff retries incase of
any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
private val partitionsWithErrorStandbyMap = new
mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition,
Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back
to the map once the currentTime > partitionsWithErrorMap.timestamp +
replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you ,
I'll send a patch or if you have any other approach please let me know.
```code
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
//add to the partitionsWithErrorMap with currentTime.
for (partition <- partitions) {
if(!partitionsWithErrorMap.contains(partition)) {
partitionsWithErrorMap.put(partition, System.currentTimeMillis())
currentOffset(partition) match {
case Some(offset: Long) =>
partitionsWithErrorStandbyMap.put(partition, offset)
}
}
}
removePartitions(partitions.toSet)
val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
// process partitionsWithErrorMap and add partitions back if the backoff
time elapsed.
partitionsWithErrorMap.foreach {
case((topicAndPartition, timeMs)) =>
if(System.currentTimeMillis() > timeMs +
brokerConfig.replicaFetcherRetryBackoffMs) {
partitionsWithErrorStandbyMap.get(topicAndPartition) match {
case Some(offset: Long) =>
partitionsToBeAdded.put(topicAndPartition, offset)
}
partitionsWithErrorStandbyMap.remove(topicAndPartition)
}
}
addPartitions(partitionsToBeAdded)
}
```
> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
> Issue Type: Improvement
> Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
> Labels: newbie++
> Fix For: 0.8.3
>
>
> The current replica fetcher thread will retry in a tight loop if any error
> occurs during the fetch call. For example, we've seen cases where the fetc