Hey Colin,

WRT memory management I think what you are saying is that you would add a
field to the fetch request which would request that the server cache the
set of partitions and the response would have a field indicating whether
that happened or not. This would allow a bound on memory.

I was also thinking there could be mechanical improvements that would help
efficiency such as sharing topic name or TopicPartition objects to reduce
the footprint in a flyweight style. If you think about it there is already
some memory overhead on a per-connection basis for socket buffers and
purgatory so a little more might be okay.

-Jay

On Wed, Nov 22, 2017 at 1:46 PM, Colin McCabe <cmcc...@apache.org> wrote:

> On Wed, Nov 22, 2017, at 13:43, Colin McCabe wrote:
> > On Wed, Nov 22, 2017, at 13:08, Jay Kreps wrote:
> > > Okay yeah, what I said didn't really work or make sense. Ismael's
> > > interpretation is better.
> > >
> > > Couple of things to point out:
> > >
> > >    1. I'm less sure that replication has a high partition count and
> > >    consumers don't. There are definitely use cases for consumers that
> > >    subscribe to everything (e.g. load all my data into HDFS) as well as
> > >    super high partition count topics. In a bigger cluster it is
> unlikely a
> > >    given node is actually replicating that many partitions from another
> > >    particular node (though perhaps in aggregate the effect is the
> same).
> > >    I think it would clearly be desirable to have a solution that
> targeted
> > >    both the consumer and replication if that were achievable.
> >
> > Hmm.  I hadn't considered the possibility that consumers might want to
> > subscribe to a huge number of topics.  That's a fair point (especially
> > with the replication example).
> >
> > >    I agree with the concern on memory, but perhaps there could be a
> way to
> > >    be smart about the memory usage?
> >
> > One approach would be to let clients compete for a configurable number
> > of cache slots on the broker.  So only the first N clients to ask for an
> > incremental fetch request UUID would receive one.  You could combine
> > this with making the clients not request an incremental fetch request
> > unless they were following more than some configurable number of
> > partitions (like 10).  That way you wouldn't waste all your cache slots
> > on clients that were only following 1 or 2 partitions, and hence
> > wouldn't benefit much from the optimization.
>
> By the way, I was envisioning the cache slots as something that would
> time out.  So if a client created an incremental fetch UUID and then
> disappeared, we'd eventually purge its cached offsets and let someone
> else use the memory.
>
> C.
>
> >
> > This is basically a bet on the idea that if you have clients following a
> > huge number of partitions, you probably will only have a limited number
> > of such clients.  Arguably, if you have a huge number of clients
> > following a huge number of partitions, you are going to have performance
> > problems anyway.
> >
> > >    2. For the question of one request vs two, one difference in values
> > >    here may be that it sounds like you are proposing a less ideal
> protocol to
> > >    simplify the broker code. To me the protocol is really *the*
> > >    fundamental interface in Kafka and we should really strive to make
> that
> > >    something that is beautiful and makes sense on its own (without
> needing
> > >    to understand the history of how we got there). I think there may
> well
> > >    be such an explanation for the two API version (as you kind of said
> with
> > >    your HDFS analogy) but really making it clear how these two APIs are
> > >    different and how they interact is key. Like, basically I think we
> should
> > >    be able to explain it from scratch in such a way that it is obvious
> you'd
> > >    have these two things as the fundamental primitives for fetching
> data.
> >
> > I can see some arguments for having a single API.  One is that both
> > incremental and full fetch requests will travel along a similar code
> > path.  There will also be a lot of the same fields in both the request
> > and the response.  Separating the APIs means duplicating those fields
> > (like max_wait_time, min_bytes, isolation_level, etc.)
> >
> > The argument for having two APIs is that some fields will be be present
> > in incremental requests and not in full ones, and vice versa.  For
> > example, incremental requests will have a UUID, whereas full requests
> > will not.  And clearly, the interpretation of some fields will be a bit
> > different.  For example, incremental requests will only return
> > information about changed partitions, whereas full requests will return
> > information about all partitions in the request.
> >
> > On the whole, maybe having a single API makes more sense?  There really
> > would be a lot of duplicated fields if we split the APIs.
> >
> > best,
> > Colin
> >
> > >
> > > -Jay
> > >
> > > On Wed, Nov 22, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > On Tue, Nov 21, 2017, at 19:03, Jay Kreps wrote:
> > > > > I think the general thrust of this makes a ton of sense.
> > > > >
> > > > > I don't love that we're introducing a second type of fetch
> request. I
> > > > > think the motivation is for compatibility, right? But isn't that
> what
> > > > > versioning s for? Basically to me although the modification we're
> making
> > > > makes
> > > > > sense, the resulting protocol doesn't really seem like something
> you
> > > > would
> > > > > design this way from scratch.
> > > >
> > > > I think there are two big reasons to consider separating
> > > > IncrementalFetchRequest from FetchRequest.
> > > >
> > > > As you say, the first reason is compatibility.  We will have to
> support
> > > > the full FetchRequest for a long time to come because of our
> > > > compatibility policy.  It would be good from a code quality point of
> > > > view to avoid having widely diverging code paths for different
> versions
> > > > of this request.
> > > >
> > > > The other reason is that conceptually I feel that there should be
> both
> > > > full and incremental fetch requests.  This is similar to how HDFS has
> > > > both incremental and full block reports.  The full reports are
> necessary
> > > > when a node is restarted.  In HDFS, they also serve a periodic sanity
> > > > check if the DataNode's view of what blocks exist has become
> > > > desynchronized from the NameNode's view.  While in theory you could
> > > > avoid the sanity check, in practice it often was important.
> > > >
> > > > Also, just to be clear, I don't think we should convert
> KafkaConsumer to
> > > > using incremental fetch requests.  It seems inadvisable to allocate
> > > > broker memory for each KafkaConsumer.  After all, there can be quite
> a
> > > > few consumers, and we don't know ahead of time how many there will
> be.
> > > > This is very different than brokers, where there are a small,
> > > > more-or-less constant, number.  Also, consumers tend not to consume
> from
> > > > a massive number of topics all at once, so I don't think they have
> the
> > > > same problems with the existing FetchRequest RPC as followers do.
> > > >
> > > > >
> > > > > I think I may be misunderstanding the semantics of the partitions
> in
> > > > > IncrementalFetchRequest. I think the intention is that the server
> > > > > remembers the partitions you last requested, and the partitions you
> > > > specify
> > > > > in the request are added to this set. This is a bit odd though
> because
> > > > you can
> > > > > add partitions but I don't see how you remove them, so it doesn't
> really
> > > > let
> > > > > you fully make changes incrementally. I suspect I'm
> misunderstanding that
> > > > > somehow, though.
> > > >
> > > > Sorry, I may have done a poor job explaining the proposal.  The
> > > > intention is that you cannot change the set of partitions you are
> > > > receiving information about except by making a full FetchRequest.  If
> > > > you need to make any changes to the watch set whatsoever, you must
> make
> > > > a full request, not an incremental.  The idea is that changes are
> very
> > > > infrequent, so we don't need to optimize this at the moment.
> > > >
> > > > > You'd also need to be a little bit careful that there was
> > > > > no way for the server's idea of what the client is interested in
> and the
> > > > > client's idea to ever diverge as you made these modifications over
> time
> > > > > (due to bugs or whatever).
> > > > >
> > > > > It seems like an alternative would be to not add a second request,
> but
> > > > > instead change the fetch api and implementation
> > > > >
> > > > >    1. We save the partitions you last fetched on that connection
> in the
> > > > >    session for the connection (as I think you are proposing)
> > > > >    2. It only gives you back info on partitions that have data or
> have
> > > > >    changed (no reason you need the others, right?)
> > > > >    3. Not specifying any partitions means "give me the usual", as
> defined
> > > > >    by whatever you requested before attached to the session.
> > > > >
> > > > > This would be a new version of the fetch API, so compatibility
> would be
> > > > > retained by retaining the older version as is.
> > > > >
> > > > > This seems conceptually simpler to me. It's true that you have to
> resend
> > > > > the full set whenever you want to change it, but that actually
> seems less
> > > > > error prone and that should be rare.
> > > > >
> > > > > I suspect you guys thought about this and it doesn't quite work,
> but
> > > > > maybe you could explain why?
> > > >
> > > > I think your proposal is actually closer to what I was intending than
> > > > you thought.  Like I said above, I believe watch-set-change
> operations
> > > > should require a full fetch request.  It is certainly simpler to
> > > > implement and understand.
> > > >
> > > > If I understand your proposal correctly, you are suggesting that the
> > > > existing FetchRequest RPC should be able to do double duty as either
> a
> > > > full or an incremental request?
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <cmcc...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I created a KIP to improve the scalability and latency of
> FetchRequest:
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > > Partition+Scalability
> > > > > >
> > > > > > Please take a look.
> > > > > >
> > > > > > cheers,
> > > > > > Colin
> > > > > >
> > > >
>

Reply via email to