[jira] [Comment Edited] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-02-13 Thread Idcmp (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14320746#comment-14320746
 ] 

Idcmp edited comment on KAFKA-1461 at 2/13/15 9:04 PM:
---

This issue can be tickled on a multi-broker configuration by having brokers 
advertise host names that do not exist.  (Say for example you're running Kafka 
in docker containers with custom hostnames :-)


was (Author: idcmp):
This issue can be tickled on a multi-broker configuration by having brokers 
advertise host names that do not exist. 


> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-12 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14359359#comment-14359359
 ] 

Joe Stein edited comment on KAFKA-1461 at 3/12/15 8:42 PM:
---

Here is my reasoning. Say you are an operations person. And, in the next 
release we tell folks about the KIP to learn and understand changes that affect 
them (yada yada language for the release). And something like this isn't in 
there. We are changing the behavior of an existing config and removing another. 
It makes the communication of behavior incongruent for the changes of a 
release. So, while I agree we don't "need it" technically but for this 
consistency reason is why I even brought it up. I was just looking at it from 
the release perspective for what ops folks are going to be looking at when we 
get there.


was (Author: joestein):
Here is my reasoning. Say you are an operations person. And, in the next 
release we tell folks about the KIP to learn and understand changes that affect 
them (yada yada language for the release). And something like this isn't in 
there. We are changing the behavior of an existing config and removing another. 
It makes the communication of behavior incongruent for the changes of a 
release. So, while I agree we don't need it for this the reason I even brought 
it up was looking at it from the release perspective for what ops folks are 
going to be looking at when we get there.

> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
> KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-01-15 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14278895#comment-14278895
 ] 

Sriharsha Chintalapani edited comment on KAFKA-1461 at 1/15/15 4:24 PM:


[~guozhang] I had the following code in my mind about backoff retries incase of 
any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
  private val partitionsWithErrorStandbyMap = new 
mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
  private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, 
Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back 
to the map once the currentTime > partitionsWithErrorMap.timestamp + 
replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you , 
I'll send a patch or if you have any other approach please let me know. 

{code}
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {

//add to the partitionsWithErrorMap with currentTime.
for (partition <- partitions) {
  if(!partitionsWithErrorMap.contains(partition)) {
partitionsWithErrorMap.put(partition, System.currentTimeMillis())
currentOffset(partition) match {
  case Some(offset: Long) =>  
partitionsWithErrorStandbyMap.put(partition, offset)
}
  }
}
removePartitions(partitions.toSet)
val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
// process partitionsWithErrorMap and add partitions back if the backoff 
time elapsed.
partitionsWithErrorMap.foreach {
  case((topicAndPartition, timeMs)) =>
if(System.currentTimeMillis() > timeMs + 
brokerConfig.replicaFetcherRetryBackoffMs) {
  partitionsWithErrorStandbyMap.get(topicAndPartition) match {
case Some(offset: Long) => 
partitionsToBeAdded.put(topicAndPartition, offset)
  }
  partitionsWithErrorStandbyMap.remove(topicAndPartition)
}
}
addPartitions(partitionsToBeAdded)
  }
{code}


was (Author: sriharsha):
[~guozhang] I had the following code in my mind about backoff retries incase of 
any error. This code will be under ReplicaFetcherThread.handlePartitions.
I am thinking off maintaining two maps in ReplicaFetcherThread
  private val partitionsWithErrorStandbyMap = new 
mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset
  private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, 
Long] // a (topic, partition) -> timestamp
one for offset and one for timestamp.
remove the partitions from the AbstractFetcherThread.partitionsMap and add back 
to the map once the currentTime > partitionsWithErrorMap.timestamp + 
replicaFetcherRetryBackoffMs .
I am not quite sure about maintaining these two maps . If its look ok to you , 
I'll send a patch or if you have any other approach please let me know. 

```code
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {

//add to the partitionsWithErrorMap with currentTime.
for (partition <- partitions) {
  if(!partitionsWithErrorMap.contains(partition)) {
partitionsWithErrorMap.put(partition, System.currentTimeMillis())
currentOffset(partition) match {
  case Some(offset: Long) =>  
partitionsWithErrorStandbyMap.put(partition, offset)
}
  }
}
removePartitions(partitions.toSet)
val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long]
// process partitionsWithErrorMap and add partitions back if the backoff 
time elapsed.
partitionsWithErrorMap.foreach {
  case((topicAndPartition, timeMs)) =>
if(System.currentTimeMillis() > timeMs + 
brokerConfig.replicaFetcherRetryBackoffMs) {
  partitionsWithErrorStandbyMap.get(topicAndPartition) match {
case Some(offset: Long) => 
partitionsToBeAdded.put(topicAndPartition, offset)
  }
  partitionsWithErrorStandbyMap.remove(topicAndPartition)
}
}
addPartitions(partitionsToBeAdded)
  }
```

> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetc