Hi Becket,

Relying on the cluster metadata doesn't seem like it would work if there
are multiple fetcher threads, right? It also doesn't work for the consumer
case, which Jay suggested would be good to handle.

Ismael

On Thu, Nov 23, 2017 at 2:21 AM, Becket Qin <becket....@gmail.com> wrote:

> Thanks for the KIP, Colin. It is an interesting idea.
>
> Thinking about the fetch protocol, at a high level, currently the following
> conveys two type of information:
> 1) what partitions I am interested in
> 2) where I am on those partitions, i.e. offsets
>
> An extreme optimization would be letting the leader know both 1) and 2)
> then the fetch request could be almost empty. I think we may be able
> achieve this when there is no leader migration.
>
> For 1) we actually kind of already have the information on each broker,
> which is the metadata. We have found that in many cases a versioned
> metadata is very helpful. With the metadata generation we can achieve 1),
> i.e. the follower do not need to tell the leader what does it interested
> in. More specifically, Assuming we add a generation to the metadata, in the
> fetch request the follower will include a metadata generation, if the
> generation matches the generation of the metadata on the leader, the leader
> will send back a response indicating that the leader knows the follower's
> interested set of partitions, so there is no need to send a full fetch
> request. Otherwise, the follower still needs to send a full fetch request
> in the next round. This will achieve the goal that unless there are leader
> migration, the followers do not need to send the full requests.
>
> There are other benefits of having a metadata generation. Those are
> orthogonal to this discussion. But since we may need it elsewhere, we need
> to introduce it at some point.
>
> For 2), there are two options, A) as Jun said, the leader can do a look up
> to know what is the last offset sent back to the follower for each
> partition, or B) the follower sends back the updated log end offset in the
> next fetch request. If we do (A), one potential optimization is that we can
> let the leader always return the offsets at index boundary or log end
> offset. For example, consider a log whose log end offset is 350, and the
> index file has an entry at offset 100, 200, 300. The leader will always try
> to return bytes at the offset boundary or log end, i.e. for each fetch
> response, the leader will try to return the data up to the highest offset
> index entry as long as the data could fit into the fetch size of the
> partition, so it could be either 100, 200, 300 or 350(LEO). If so, the
> leader will know the last returned offset without an additional log scan.
> If the leader was not able to return at the index boundary or log end
> offset, e.g. the fetch size is too small or the index bytes interval is too
> large, the leader could then fall back to lookup the offset. Alternatively,
> the leader can set a flag in the fetch response asking the follower to
> provide the fetch offset in the next fetch request.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Nov 22, 2017 at 4:53 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Colin,
> >
> > After step 3a, do we need to update the cached offset in the leader to be
> > the last offset in the data returned in the fetch response? If so, we
> need
> > another offset index lookup since the leader only knows that it gives
> out X
> > bytes in the fetch response, but not the last offset in those X bytes.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> >
> > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > When fetching data for a partition, the leader needs to translate the
> > > > fetch offset to a position in a log segment with an index lookup. If
> > the
> > > fetch
> > > > request now also needs to cache the offset for the next fetch
> request,
> > > > there will be an extra offset index lookup.
> > >
> > > Hmm.  So the way I was thinking about it was, with an incremental fetch
> > > request, for each partition:
> > >
> > > 1a. the leader consults its cache to find the offset it needs to use
> for
> > > the fetch request
> > > 2a. the leader performs a lookup to translate the offset to a file
> index
> > > 3a. the leader reads the data from the file
> > >
> > > In contrast, with a full fetch request, for each partition:
> > >
> > > 1b. the leader looks at the FetchRequest to find the offset it needs to
> > > use for the fetch request
> > > 2b. the leader performs a lookup to translate the offset to a file
> index
> > > 3b. the leader reads the data from the file
> > >
> > > It seems like there is only one offset index lookup in both cases?  The
> > > key point is that the cache in step #1a is not stored on disk.  Or
> maybe
> > > I'm missing something here.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > > The offset index lookup can
> > > > potentially be expensive since it could require disk I/Os. One way to
> > > > optimize this a bit is to further cache the log segment position for
> > the
> > > > next offset. The tricky issue is that for a compacted topic, the
> > > > underlying
> > > > log segment could have changed between two consecutive fetch
> requests.
> > We
> > > > could potentially make that case work, but the logic will be more
> > > > complicated.
> > > >
> > > > Another thing is that it seems that the proposal only saves the
> > metadata
> > > > overhead if there are low volume topics. If we use Jay's suggestion
> of
> > > > including 0 partitions in subsequent fetch requests, it seems that we
> > > > could
> > > > get the metadata saving even if all topics have continuous traffic.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
> > > > > > Hi, Jay,
> > > > > >
> > > > > > I guess in your proposal the leader has to cache the last offset
> > > given
> > > > > > back for each partition so that it knows from which offset to
> serve
> > > the
> > > > > next
> > > > > > fetch request.
> > > > >
> > > > > Hi Jun,
> > > > >
> > > > > Just to clarify, the leader has to cache the last offset for each
> > > > > follower / UUID in the original KIP-227 proposal as well.  Sorry if
> > > that
> > > > > wasn't clear.
> > > > >
> > > > > > This is doable but it means that the leader needs to do an
> > > > > > additional index lookup per partition to serve a fetch request.
> Not
> > > sure
> > > > > > if the benefit from the lighter fetch request obviously offsets
> the
> > > > > > additional index lookup though.
> > > > >
> > > > > The runtime impact should be a small constant factor at most,
> right?
> > > > > You would just have a mapping between UUID and the latest offset in
> > > each
> > > > > partition data structure.  It seems like the runtime impact of
> > looking
> > > > > up the fetch offset in a hash table (or small array) in the
> in-memory
> > > > > partition data structure should be very similar to the runtime
> impact
> > > of
> > > > > looking up the fetch offset in the FetchRequest.
> > > > >
> > > > > The extra memory consumption per partition is O(num_brokers), which
> > is
> > > > > essentially a small constant.  (The fact that brokers can have
> > multiple
> > > > > UUIDs due to parallel fetches is a small wrinkle.  But we can place
> > an
> > > > > upper bound on the number of UUIDs permitted per broker.)
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <j...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > I think the general thrust of this makes a ton of sense.
> > > > > > >
> > > > > > > I don't love that we're introducing a second type of fetch
> > > request. I
> > > > > think
> > > > > > > the motivation is for compatibility, right? But isn't that what
> > > > > versioning
> > > > > > > is for? Basically to me although the modification we're making
> > > makes
> > > > > sense,
> > > > > > > the resulting protocol doesn't really seem like something you
> > would
> > > > > design
> > > > > > > this way from scratch.
> > > > > > >
> > > > > > > I think I may be misunderstanding the semantics of the
> partitions
> > > in
> > > > > > > IncrementalFetchRequest. I think the intention is that the
> server
> > > > > remembers
> > > > > > > the partitions you last requested, and the partitions you
> specify
> > > in
> > > > > the
> > > > > > > request are added to this set. This is a bit odd though because
> > you
> > > > > can add
> > > > > > > partitions but I don't see how you remove them, so it doesn't
> > > really
> > > > > let
> > > > > > > you fully make changes incrementally. I suspect I'm
> > > misunderstanding
> > > > > that
> > > > > > > somehow, though. You'd also need to be a little bit careful
> that
> > > there
> > > > > was
> > > > > > > no way for the server's idea of what the client is interested
> in
> > > and
> > > > > the
> > > > > > > client's idea to ever diverge as you made these modifications
> > over
> > > time
> > > > > > > (due to bugs or whatever).
> > > > > > >
> > > > > > > It seems like an alternative would be to not add a second
> > request,
> > > but
> > > > > > > instead change the fetch api and implementation
> > > > > > >
> > > > > > >    1. We save the partitions you last fetched on that
> connection
> > > in the
> > > > > > >    session for the connection (as I think you are proposing)
> > > > > > >    2. It only gives you back info on partitions that have data
> or
> > > have
> > > > > > >    changed (no reason you need the others, right?)
> > > > > > >    3. Not specifying any partitions means "give me the usual",
> as
> > > > > defined
> > > > > > >    by whatever you requested before attached to the session.
> > > > > > >
> > > > > > > This would be a new version of the fetch API, so compatibility
> > > would be
> > > > > > > retained by retaining the older version as is.
> > > > > > >
> > > > > > > This seems conceptually simpler to me. It's true that you have
> to
> > > > > resend
> > > > > > > the full set whenever you want to change it, but that actually
> > > seems
> > > > > less
> > > > > > > error prone and that should be rare.
> > > > > > >
> > > > > > > I suspect you guys thought about this and it doesn't quite
> work,
> > > but
> > > > > maybe
> > > > > > > you could explain why?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <
> > cmcc...@apache.org>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I created a KIP to improve the scalability and latency of
> > > > > FetchRequest:
> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > > > > Partition+Scalability
> > > > > > > >
> > > > > > > > Please take a look.
> > > > > > > >
> > > > > > > > cheers,
> > > > > > > > Colin
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
>

Reply via email to