Thanks. I patch it, and everything goes ok.
> 在 2016年10月9日,下午12:39,Becket Qin <becket....@gmail.com> 写道:
> 
> Can you check if you have KAFKA-3003 when you run the code?
> 
> On Sat, Oct 8, 2016 at 12:52 AM, Kafka <kafka...@126.com> wrote:
> 
>> Hi all,
>>        we found our consumer have high cpu load in our product
>> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
>> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return,
>> so we adjust them to very big so that broker is very hard to satisfy it.
>>        then we found the problem is not be solved,then we check the
>> kafka’s code,we check delayedFetch’s tryComplete() function has these codes,
>> 
>>         if (endOffset.messageOffset != fetchOffset.messageOffset) {
>>              if (endOffset.onOlderSegment(fetchOffset)) {
>>                // Case C, this can happen when the new fetch operation is
>> on a truncated leader
>>                debug("Satisfying fetch %s since it is fetching later
>> segments of partition %s.".format(fetchMetadata, topicAndPartition))
>>                return forceComplete()
>>              } else if (fetchOffset.onOlderSegment(endOffset)) {
>>                // Case C, this can happen when the fetch operation is
>> falling behind the current segment
>>                // or the partition has just rolled a new segment
>>                debug("Satisfying fetch %s immediately since it is
>> fetching older segments.".format(fetchMetadata))
>>                return forceComplete()
>>              } else if (fetchOffset.messageOffset <
>> endOffset.messageOffset) {
>>                // we need take the partition fetch size as upper bound
>> when accumulating the bytes
>>                accumulatedSize += 
>> math.min(endOffset.positionDiff(fetchOffset),
>> fetchStatus.fetchInfo.fetchSize)
>>              }
>>            }
>> 
>> so we can ensure that our fetchOffset’s segmentBaseOffset is not the same
>> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
>> segment, we found the data in the segment is all cleaned by the kafka for
>> log.retention.
>> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
>> endOffset’s segmentBaseOffset leads this problem.
>> 
>> but my point is should we use we use these code to make client use less
>> cpu,
>>   if (endOffset.messageOffset != fetchOffset.messageOffset) {
>>              if (endOffset.onOlderSegment(fetchOffset)) {
>>                return false
>>              } else if (fetchOffset.onOlderSegment(endOffset)) {
>>                return false
>>              }
>>    }
>> 
>> and then it will response after fetch.wait.ma <http://fetch.wait.ma/>x.ms
>> in this scene instead of immediately return.
>> 
>> Feedback is greatly appreciated. Thanks.
>> 
>> 
>> 
>> 


Reply via email to