Glad to know :)

On Tue, Oct 18, 2016 at 1:24 AM, Json Tu <kafka...@126.com> wrote:

> Thanks. I patch it, and everything goes ok.
> > 在 2016年10月9日,下午12:39,Becket Qin <becket....@gmail.com> 写道:
> >
> > Can you check if you have KAFKA-3003 when you run the code?
> >
> > On Sat, Oct 8, 2016 at 12:52 AM, Kafka <kafka...@126.com> wrote:
> >
> >> Hi all,
> >>        we found our consumer have high cpu load in our product
> >> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
> >> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s
> return,
> >> so we adjust them to very big so that broker is very hard to satisfy it.
> >>        then we found the problem is not be solved,then we check the
> >> kafka’s code,we check delayedFetch’s tryComplete() function has these
> codes,
> >>
> >>         if (endOffset.messageOffset != fetchOffset.messageOffset) {
> >>              if (endOffset.onOlderSegment(fetchOffset)) {
> >>                // Case C, this can happen when the new fetch operation
> is
> >> on a truncated leader
> >>                debug("Satisfying fetch %s since it is fetching later
> >> segments of partition %s.".format(fetchMetadata, topicAndPartition))
> >>                return forceComplete()
> >>              } else if (fetchOffset.onOlderSegment(endOffset)) {
> >>                // Case C, this can happen when the fetch operation is
> >> falling behind the current segment
> >>                // or the partition has just rolled a new segment
> >>                debug("Satisfying fetch %s immediately since it is
> >> fetching older segments.".format(fetchMetadata))
> >>                return forceComplete()
> >>              } else if (fetchOffset.messageOffset <
> >> endOffset.messageOffset) {
> >>                // we need take the partition fetch size as upper bound
> >> when accumulating the bytes
> >>                accumulatedSize += math.min(endOffset.
> positionDiff(fetchOffset),
> >> fetchStatus.fetchInfo.fetchSize)
> >>              }
> >>            }
> >>
> >> so we can ensure that our fetchOffset’s segmentBaseOffset is not the
> same
> >> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
> >> segment, we found the data in the segment is all cleaned by the kafka
> for
> >> log.retention.
> >> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
> >> endOffset’s segmentBaseOffset leads this problem.
> >>
> >> but my point is should we use we use these code to make client use less
> >> cpu,
> >>   if (endOffset.messageOffset != fetchOffset.messageOffset) {
> >>              if (endOffset.onOlderSegment(fetchOffset)) {
> >>                return false
> >>              } else if (fetchOffset.onOlderSegment(endOffset)) {
> >>                return false
> >>              }
> >>    }
> >>
> >> and then it will response after fetch.wait.ma <http://fetch.wait.ma/>
> x.ms
> >> in this scene instead of immediately return.
> >>
> >> Feedback is greatly appreciated. Thanks.
> >>
> >>
> >>
> >>
>
>
>

Reply via email to