Glad to know :) On Tue, Oct 18, 2016 at 1:24 AM, Json Tu <kafka...@126.com> wrote:
> Thanks. I patch it, and everything goes ok. > > 在 2016年10月9日,下午12:39,Becket Qin <becket....@gmail.com> 写道: > > > > Can you check if you have KAFKA-3003 when you run the code? > > > > On Sat, Oct 8, 2016 at 12:52 AM, Kafka <kafka...@126.com> wrote: > > > >> Hi all, > >> we found our consumer have high cpu load in our product > >> enviroment,as we know,fetch.min.bytes and fetch.wait.ma < > >> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s > return, > >> so we adjust them to very big so that broker is very hard to satisfy it. > >> then we found the problem is not be solved,then we check the > >> kafka’s code,we check delayedFetch’s tryComplete() function has these > codes, > >> > >> if (endOffset.messageOffset != fetchOffset.messageOffset) { > >> if (endOffset.onOlderSegment(fetchOffset)) { > >> // Case C, this can happen when the new fetch operation > is > >> on a truncated leader > >> debug("Satisfying fetch %s since it is fetching later > >> segments of partition %s.".format(fetchMetadata, topicAndPartition)) > >> return forceComplete() > >> } else if (fetchOffset.onOlderSegment(endOffset)) { > >> // Case C, this can happen when the fetch operation is > >> falling behind the current segment > >> // or the partition has just rolled a new segment > >> debug("Satisfying fetch %s immediately since it is > >> fetching older segments.".format(fetchMetadata)) > >> return forceComplete() > >> } else if (fetchOffset.messageOffset < > >> endOffset.messageOffset) { > >> // we need take the partition fetch size as upper bound > >> when accumulating the bytes > >> accumulatedSize += math.min(endOffset. > positionDiff(fetchOffset), > >> fetchStatus.fetchInfo.fetchSize) > >> } > >> } > >> > >> so we can ensure that our fetchOffset’s segmentBaseOffset is not the > same > >> as endOffset’s segmentBaseOffset,then we check our topic-partition’s > >> segment, we found the data in the segment is all cleaned by the kafka > for > >> log.retention. > >> and we guess that the fetchOffset’s segmentBaseOffset is smaller than > >> endOffset’s segmentBaseOffset leads this problem. > >> > >> but my point is should we use we use these code to make client use less > >> cpu, > >> if (endOffset.messageOffset != fetchOffset.messageOffset) { > >> if (endOffset.onOlderSegment(fetchOffset)) { > >> return false > >> } else if (fetchOffset.onOlderSegment(endOffset)) { > >> return false > >> } > >> } > >> > >> and then it will response after fetch.wait.ma <http://fetch.wait.ma/> > x.ms > >> in this scene instead of immediately return. > >> > >> Feedback is greatly appreciated. Thanks. > >> > >> > >> > >> > > >