[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-23 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17219704#comment-17219704
 ] 

Flavien Raynaud commented on KAFKA-8733:


Good point, we've not tried this in production. We haven't had enough 
occurrences of this issue (thankfully :p) to know how long it takes between 
"leader has disk issues and slow reads" and "leader dies because of disk 
failure, revoking leadership", and what would be a good compromise with the "a 
follower is genuinely falling behind" case.
We can try this for some of our clusters with lower consistency requirements, 
but it may be weeks/months before we hit this issue happen again.

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-21 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17218341#comment-17218341
 ] 

Flavien Raynaud commented on KAFKA-8733:


Hi [~d8tltanc], thank you for looking into this! It's definitely still an issue 
for us. Even though it happens quite rarely, it is really painful when it does 
happen.
We haven't found any decent way to mitigate this issue, we have only built some 
internal tooling to make recovery faster (identifying impacted partitions, 
making sure the replica-set of said partitions does not contain empty replicas, 
enabling unclear leader election if needed, and reassigning to a replacement 
broker).

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-08-24 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17183304#comment-17183304
 ] 

Flavien Raynaud commented on KAFKA-8733:


Has there been any update regarding this issue/the associated KIP? I can see 
that the thread on the mailing list has been empty for the past 6 months. It 
has happened again recently when one broker ecountered a disk failure, causing 
a bunch of offline partitions. Happy to help in any way we can 

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-01-21 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020083#comment-17020083
 ] 

Flavien Raynaud commented on KAFKA-8733:


We've seen offline partitions happening for the same reason in one of our 
clusters too, where only the broker leader for the offline partitions was 
having disk issues. It looks like there has not been much progress/look on the 
PR submitted since December 9th. Is there anything blocking this change from 
moving forward? :)

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7286) Loading offsets and group metadata hangs with large group metadata records

2018-09-10 Thread Flavien Raynaud (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16608869#comment-16608869
 ] 

Flavien Raynaud commented on KAFKA-7286:


In our case, we did not have that many members (something around 30 members). 
However each member was responsible for a lot of topic-partitions (>500), which 
ended up causing these large metadata records.

> Loading offsets and group metadata hangs with large group metadata records
> --
>
> Key: KAFKA-7286
> URL: https://issues.apache.org/jira/browse/KAFKA-7286
> Project: Kafka
>  Issue Type: Bug
>Reporter: Flavien Raynaud
>Assignee: Flavien Raynaud
>Priority: Minor
> Fix For: 2.1.0
>
>
> When a (Kafka-based) consumer group contains many members, group metadata 
> records (in the {{__consumer-offsets}} topic) may happen to be quite large.
> Increasing the {{message.max.bytes}} makes storing these records possible.
>  Loading them when a broker restart is done via 
> [doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
>  However, this method relies on the {{offsets.load.buffer.size}} 
> configuration to create a 
> [buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
>  that will contain the records being loaded.
> If a group metadata record is too large for this buffer, the loading method 
> will get stuck trying to load records (in a tight loop) into a buffer that 
> cannot accommodate a single record.
> 
> For example, if the {{__consumer-offsets-9}} partition contains a record 
> smaller than {{message.max.bytes}} but larger than 
> {{offsets.load.buffer.size}}, logs would indicate the following:
> {noformat}
> ...
> [2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-9 
> (kafka.coordinator.group.GroupMetadataManager)
> ...
> {noformat}
> But logs will never contain the expected {{Finished loading offsets and group 
> metadata from ...}} line.
> Consumers whose group are assigned to this partition will see {{Marking the 
> coordinator dead}} and will never be able to stabilize and make progress.
> 
> From what I could gather in the code, it seems that:
>  - 
> [fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
>  returns at least one record (even if larger than 
> {{offsets.load.buffer.size}}, thanks to {{minOneMessage = true}})
>  - No fully-readable record is stored in the buffer with 
> [fileRecords.readInto(buffer, 
> 0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
>  (too large to fit in the buffer)
>  - 
> [memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
>  returns an empty iterator
>  - 
> [currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
>  never advances, hence loading the partition hangs forever.
> 
> It would be great to let the partition load even if a record is larger than 
> the configured {{offsets.load.buffer.size}} limit. The fact that 
> {{minOneMessage = true}} when reading records seems to indicate it might be a 
> good idea for the buffer to accommodate at least one record.
> If you think the limit should stay a hard limit, then at least adding a log 
> line indicating {{offsets.load.buffer.size}} is not large enough and should 
> be increased. Otherwise, one can only guess and dig through the code to 
> figure out what is happening :)
> I will try to open a PR with the first idea (allowing large records to be 
> read when needed) soon, but any feedback from anyone who also had the same 
> issue in the past would be appreciated :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7286) Loading offsets and group metadata hangs with large group metadata records

2018-08-22 Thread Flavien Raynaud (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589263#comment-16589263
 ] 

Flavien Raynaud commented on KAFKA-7286:


Any chance anyone could have a look at it? :) 

> Loading offsets and group metadata hangs with large group metadata records
> --
>
> Key: KAFKA-7286
> URL: https://issues.apache.org/jira/browse/KAFKA-7286
> Project: Kafka
>  Issue Type: Bug
>Reporter: Flavien Raynaud
>Assignee: Flavien Raynaud
>Priority: Minor
>
> When a (Kafka-based) consumer group contains many members, group metadata 
> records (in the {{__consumer-offsets}} topic) may happen to be quite large.
> Increasing the {{message.max.bytes}} makes storing these records possible.
>  Loading them when a broker restart is done via 
> [doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
>  However, this method relies on the {{offsets.load.buffer.size}} 
> configuration to create a 
> [buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
>  that will contain the records being loaded.
> If a group metadata record is too large for this buffer, the loading method 
> will get stuck trying to load records (in a tight loop) into a buffer that 
> cannot accommodate a single record.
> 
> For example, if the {{__consumer-offsets-9}} partition contains a record 
> smaller than {{message.max.bytes}} but larger than 
> {{offsets.load.buffer.size}}, logs would indicate the following:
> {noformat}
> ...
> [2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-9 
> (kafka.coordinator.group.GroupMetadataManager)
> ...
> {noformat}
> But logs will never contain the expected {{Finished loading offsets and group 
> metadata from ...}} line.
> Consumers whose group are assigned to this partition will see {{Marking the 
> coordinator dead}} and will never be able to stabilize and make progress.
> 
> From what I could gather in the code, it seems that:
>  - 
> [fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
>  returns at least one record (even if larger than 
> {{offsets.load.buffer.size}}, thanks to {{minOneMessage = true}})
>  - No fully-readable record is stored in the buffer with 
> [fileRecords.readInto(buffer, 
> 0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
>  (too large to fit in the buffer)
>  - 
> [memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
>  returns an empty iterator
>  - 
> [currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
>  never advances, hence loading the partition hangs forever.
> 
> It would be great to let the partition load even if a record is larger than 
> the configured {{offsets.load.buffer.size}} limit. The fact that 
> {{minOneMessage = true}} when reading records seems to indicate it might be a 
> good idea for the buffer to accommodate at least one record.
> If you think the limit should stay a hard limit, then at least adding a log 
> line indicating {{offsets.load.buffer.size}} is not large enough and should 
> be increased. Otherwise, one can only guess and dig through the code to 
> figure out what is happening :)
> I will try to open a PR with the first idea (allowing large records to be 
> read when needed) soon, but any feedback from anyone who also had the same 
> issue in the past would be appreciated :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7286) Loading offsets and group metadata hangs with large group metadata records

2018-08-21 Thread Flavien Raynaud (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavien Raynaud reassigned KAFKA-7286:
--

Assignee: Flavien Raynaud

> Loading offsets and group metadata hangs with large group metadata records
> --
>
> Key: KAFKA-7286
> URL: https://issues.apache.org/jira/browse/KAFKA-7286
> Project: Kafka
>  Issue Type: Bug
>Reporter: Flavien Raynaud
>Assignee: Flavien Raynaud
>Priority: Minor
>
> When a (Kafka-based) consumer group contains many members, group metadata 
> records (in the {{__consumer-offsets}} topic) may happen to be quite large.
> Increasing the {{message.max.bytes}} makes storing these records possible.
>  Loading them when a broker restart is done via 
> [doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
>  However, this method relies on the {{offsets.load.buffer.size}} 
> configuration to create a 
> [buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
>  that will contain the records being loaded.
> If a group metadata record is too large for this buffer, the loading method 
> will get stuck trying to load records (in a tight loop) into a buffer that 
> cannot accommodate a single record.
> 
> For example, if the {{__consumer-offsets-9}} partition contains a record 
> smaller than {{message.max.bytes}} but larger than 
> {{offsets.load.buffer.size}}, logs would indicate the following:
> {noformat}
> ...
> [2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-9 
> (kafka.coordinator.group.GroupMetadataManager)
> ...
> {noformat}
> But logs will never contain the expected {{Finished loading offsets and group 
> metadata from ...}} line.
> Consumers whose group are assigned to this partition will see {{Marking the 
> coordinator dead}} and will never be able to stabilize and make progress.
> 
> From what I could gather in the code, it seems that:
>  - 
> [fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
>  returns at least one record (even if larger than 
> {{offsets.load.buffer.size}}, thanks to {{minOneMessage = true}})
>  - No fully-readable record is stored in the buffer with 
> [fileRecords.readInto(buffer, 
> 0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
>  (too large to fit in the buffer)
>  - 
> [memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
>  returns an empty iterator
>  - 
> [currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
>  never advances, hence loading the partition hangs forever.
> 
> It would be great to let the partition load even if a record is larger than 
> the configured {{offsets.load.buffer.size}} limit. The fact that 
> {{minOneMessage = true}} when reading records seems to indicate it might be a 
> good idea for the buffer to accommodate at least one record.
> If you think the limit should stay a hard limit, then at least adding a log 
> line indicating {{offsets.load.buffer.size}} is not large enough and should 
> be increased. Otherwise, one can only guess and dig through the code to 
> figure out what is happening :)
> I will try to open a PR with the first idea (allowing large records to be 
> read when needed) soon, but any feedback from anyone who also had the same 
> issue in the past would be appreciated :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7286) Loading offsets and group metadata hangs with large group metadata records

2018-08-13 Thread Flavien Raynaud (JIRA)
Flavien Raynaud created KAFKA-7286:
--

 Summary: Loading offsets and group metadata hangs with large group 
metadata records
 Key: KAFKA-7286
 URL: https://issues.apache.org/jira/browse/KAFKA-7286
 Project: Kafka
  Issue Type: Bug
Reporter: Flavien Raynaud


When a (Kafka-based) consumer group contains many members, group metadata 
records (in the {{__consumer-offsets}} topic) may happen to be quite large.

Increasing the {{message.max.bytes}} makes storing these records possible.
 Loading them when a broker restart is done via 
[doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
 However, this method relies on the {{offsets.load.buffer.size}} configuration 
to create a 
[buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
 that will contain the records being loaded.

If a group metadata record is too large for this buffer, the loading method 
will get stuck trying to load records (in a tight loop) into a buffer that 
cannot accommodate a single record.

For example, if the {{__consumer-offsets-9}} partition contains a record 
smaller than {{message.max.bytes}} but larger than 
{{offsets.load.buffer.size}}, logs would indicate the following:
{noformat}
...
[2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling 
loading of offsets and group metadata from __consumer_offsets-9 
(kafka.coordinator.group.GroupMetadataManager)
...
{noformat}
But logs will never contain the expected {{Finished loading offsets and group 
metadata from ...}} line.

Consumers whose group are assigned to this partition will see {{Marking the 
coordinator dead}} and will never be able to stabilize and make progress.

>From what I could gather in the code, it seems that:
 - 
[fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
 returns at least one record (even if larger than {{offsets.load.buffer.size}}, 
thanks to {{minOneMessage = true}})
 - No fully-readable record is stored in the buffer with 
[fileRecords.readInto(buffer, 
0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
 (too large to fit in the buffer)
 - 
[memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
 returns an empty iterator
 - 
[currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
 never advances, hence loading the partition hangs forever.


It would be great to let the partition load even if a record is larger than the 
configured {{offsets.load.buffer.size}} limit. The fact that {{minOneMessage = 
true}} when reading records seems to indicate it might be a good idea for the 
buffer to accommodate at least one record.

If you think the limit should stay a hard limit, then at least adding a log 
line indicating {{offsets.load.buffer.size}} is not large enough and should be 
increased. Otherwise, one can only guess and dig through the code to figure out 
what is happening :)

I will try to open a PR with the first idea (allowing large records to be read 
when needed) soon, but any feedback from anyone who also had the same issue in 
the past would be appreciated :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)