Thanks. I patch it, and everything goes ok.
> 在 2016年10月9日,下午12:39,Becket Qin <becket....@gmail.com> 写道:
>
> Can you check if you have KAFKA-3003 when you run the code?
>
> On Sat, Oct 8, 2016 at 12:52 AM, Kafka <kafka...@126.com> wrote:
>
>> Hi all,
>> we found our consumer have high cpu load in our product
>> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
>> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return,
>> so we adjust them to very big so that broker is very hard to satisfy it.
>> then we found the problem is not be solved,then we check the
>> kafka’s code,we check delayedFetch’s tryComplete() function has these codes,
>>
>> if (endOffset.messageOffset != fetchOffset.messageOffset) {
>> if (endOffset.onOlderSegment(fetchOffset)) {
>> // Case C, this can happen when the new fetch operation is
>> on a truncated leader
>> debug("Satisfying fetch %s since it is fetching later
>> segments of partition %s.".format(fetchMetadata, topicAndPartition))
>> return forceComplete()
>> } else if (fetchOffset.onOlderSegment(endOffset)) {
>> // Case C, this can happen when the fetch operation is
>> falling behind the current segment
>> // or the partition has just rolled a new segment
>> debug("Satisfying fetch %s immediately since it is
>> fetching older segments.".format(fetchMetadata))
>> return forceComplete()
>> } else if (fetchOffset.messageOffset <
>> endOffset.messageOffset) {
>> // we need take the partition fetch size as upper bound
>> when accumulating the bytes
>> accumulatedSize +=
>> math.min(endOffset.positionDiff(fetchOffset),
>> fetchStatus.fetchInfo.fetchSize)
>> }
>> }
>>
>> so we can ensure that our fetchOffset’s segmentBaseOffset is not the same
>> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
>> segment, we found the data in the segment is all cleaned by the kafka for
>> log.retention.
>> and we guess that the fetchOffset’s segmentBaseOffset is smaller than
>> endOffset’s segmentBaseOffset leads this problem.
>>
>> but my point is should we use we use these code to make client use less
>> cpu,
>> if (endOffset.messageOffset != fetchOffset.messageOffset) {
>> if (endOffset.onOlderSegment(fetchOffset)) {
>> return false
>> } else if (fetchOffset.onOlderSegment(endOffset)) {
>> return false
>> }
>> }
>>
>> and then it will response after fetch.wait.ma <http://fetch.wait.ma/>x.ms
>> in this scene instead of immediately return.
>>
>> Feedback is greatly appreciated. Thanks.
>>
>>
>>
>>