Hey Colin,

Thanks for the reply. Please see my comments inline.

On Sat, Nov 25, 2017 at 3:33 PM, Colin McCabe <cmcc...@apache.org> wrote:

> On Fri, Nov 24, 2017, at 22:06, Dong Lin wrote:
> > Hey Colin,
> >
> > Thanks for the reply! Please see my comment inline.
> >
> > On Fri, Nov 24, 2017 at 9:39 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> >
> > > On Thu, Nov 23, 2017, at 18:35, Dong Lin wrote:
> > > > Hey Colin,
> > > >
> > > > Thanks for the KIP! This is definitely useful when there are many
> idle
> > > > partitions in the clusters.
> > > >
> > > > Just in case it is useful, I will provide some number here. We
> observe
> > > > that for a clsuter that have around 2.5k partitions per broker, the
> > > > ProduceRequestTotal time average value is around 25 ms. For a cluster
> > > > with 2.5k partitions per broker whose AllTopicsBytesInRate is only
> > > around 6
> > > > MB/s, the ProduceRequestTotalTime average value is around 180 ms,
> most of
> > > > which is spent on ProduceRequestRemoteTime. The increased
> > > > ProduceRequestTotalTime significantly reduces throughput of producers
> > > > with ack=all. I think this KIP can help address this problem.
> > >
> > > Hi Dong,
> > >
> > > Thanks for the numbers.  It's good to have empirical confirmation that
> > > this will help!
> > >
> > > >
> > > > Here are some of my ideas on the current KIP:
> > > >
> > > > - The KIP says that the follower will include a partition in
> > > > the IncrementalFetchRequest if the LEO of the partition has been
> updated.
> > > > It seems that doing so may prevent leader from knowing information
> (e.g.
> > > > LogStartOffset) of the follower that will otherwise be included in
> the
> > > > FetchRequest. Maybe we should have a paragraph to explicitly define
> the
> > > > full criteria of when the fetcher should include a partition in the
> > > > FetchResponse and probably include logStartOffset as part of the
> > > > criteria?
> > >
> > > Hmm.  That's a good point... we should think about whether we need to
> > > send partition information in an incremental update when the LSO
> > > changes.
> > >
> > > Sorry if this is a dumb question, but what does the leader do with the
> > > logStartOffset of the followers?  When does the leader need to know it?
> > > Also, how often do we expect it to be changed by the LogCleaner?
> > >
> >
>
> Hi Dong,
>
> > The leader uses logStartOffset of the followers to determine the
> > logStartOffset of the partition. It is needed to handle
> > DeleteRecordsRequest. It can be changed if the log is deleted on the
> > follower due to log retention.
>
> Is there really a big advantage to the leader caching the LSO for each
> follower?  I guess it allows you to avoid sending the
> DeleteRecordsRequest to followers that you know have already deleted the
> records in question.  But the leader can just broadcast the request to
> all the followers.  This uses less network bandwidth than sending a
> single batch of records with acks=all.
>

This is probably not just about caching. leader uses the LSO in the
FetchRequest from follower to figure out whether DeleteRecordsRequest can
succeed. Thus if follower does not send FetchRequest, leader will not know
the information needed for handling DeleteRecordsRequest. It is possible to
change the procedure for handling DeleteRecordsRequest. It is just that the
KIP probably needs to specify the change in more detail and we need to
understand whether this is the best approach.

IMO the work in this KIP can be divided into three parts:

1) follower can skip a partition in the FetchRequest if the information of
that partition (i.e. those fields in FETCH_REQUEST_PARTITION_V5) does not
change in comparison to the last FetchRequest from this follower.
2) the leader can skip a partition in the FetchResponse if the information
of that partition (i.e. those fields in FETCH_RESPONSE_PARTITION_V5) has
not changed in comparison to the last FetchResponse to this follower.
3) we can further skip a partition in FetchRequest (or FetchResponse) if
the fields that have changed (e.g. LSO in the FetchRequest) does not need
to be sent.

It seems to me that 1) and 2) are the most important part of the KIP. These
two parts are "safe" to do in the sense that no information will be lost
even if we skip these partitions in the FetchRequest/FetchResponse. It also
seems that these two parts can achieve the main goal of this KIP because if
a partition does not have inactive traffic, mostly likely the corresponding
fields in FETCH_REQUEST_PARTITION_V5 and FETCH_RESPONSE_PARTITION_V5 will
not change, and therefore this partition can be skipped in most
FetchRequest and FetchResponse.

On the other hand, the part 3) can possibly be a useful optimization but it
can also be a bit unsafe and require more discussion. For example, if we
skip a partition in the FetchRequest when its LSO has changed, this can
potentially affect the handling of DeleteRecordsRequest. It is possible
that we can indeed make such optimization. But we need to check whether the
cost saving of such optimization is worth the potential complexity caused
by the optimization in the KIP. Similarly, the KIP probably needs to
explain why the change of other fields (e.g. HW in FetchResponse) does not
matter (or is not possible) so that a partition can be skipped in the
FetchResponse if this partition has no new records.

I am thinking that maybe we can focus on 1) and 2) first, and if the
expected performance is good enough, we won't have to do 3). It is just my
two cents. Does this sound reasonable?


>
> >
> >
> > >
> > > > - It seems that every time the set of partitions in the
> > > > ReplicaFetcherThread is changed, or if follower restarts, a new UUID
> will
> > > > be generated in the leader and leader will add a new entry in the
> > > > in-memory  map to map the UUID to list of partitions (and other
> metadata
> > > such as
> > > > fetch offset). This map with grow over time depending depending on
> the
> > > > frequency of events such as partition movement or broker restart. As
> you
> > > mentioned,
> > > > we probably need to timeout entries in this map. But there is also
> > > > tradeoff  in this timeout -- large timeout increase memory usage
> whereas
> > > smaller
> > > > timeout increases frequency of the full FetchRequest. Could you
> specify
> > > > the default value of this timeout and probably also explain how it
> > > affects
> > > > the performance of this KIP?
> > >
> > > Right, there are definitely some tradeoffs here.
> > >
> > > Since fetches happen very frequently, I think even a short UUID cache
> > > expiration time of a minute or two should already be enough to ensure
> > > that 99%+ of all fetch requests are incremental fetch requests.  I
> think
> > > the idea of partitioning the cache per broker is a good one which will
> > > let us limit memory consumption even more.
> > >
> > > If replica fetcher threads do change their partition assignments often,
> > > we could also add a special "old UUID to uncache" field to the
> > > FetchRequest as well.  That would avoid having to wait for the full
> > > minute to clear the UUID cache.  That's probably not  necessary,
> > > though...
> > >
> >
> > I think expiration time of a minute is two is probably reasonable. Yeah
> > we
> > can discuss it further after the KIP is updated. Thanks!
> >
> >
> > >
> > > > Also, do you think we can avoid having duplicate
> > > > entries from the same ReplicaFetcher (in case of partition set
> change) by
> > > > using brokerId+fetcherThreadIndex as the UUID?
> > >
> > > My concern about that is that if two messages get reordered somehow, or
> > > an update gets lost, the view of partitions which the fetcher thread
> has
> > > could diverge from the view which the leader has.  Also, UUIDs work for
> > > consumers, but clearly consumers cannot use a
> > > brokerID+fetcherThreadIndex.  It's simpler to have one system than two.
> > >
> >
> > Yeah this can be a problem if two messages are lost of reordered somehow.
> > I
> > am just wondering whether there actually exists a scenario where the
> > message can be ordered between ReplicaFetcherThread and the leader. My
> > gut
> > feel is that since the ReplicaFetcherThread talks to leader using a
> > single
> > TCP connection with inflight requests = 1, out-of-order delivery probably
> > should not happen. I may be wrong though. What do you think?
>
> It's not necessarily a single TCP connection, though... we re-establish
> the connection when required.  I also suspect that we don't always
> process requests strictly in the order they came in, due to using things
> like multiple worker threads that operate in parallel.
>

I agree with your comment that "It's simpler to have one system than two".
So that it is probably a good reason for using random UUID. We can examine
this later :)

I maybe wrong and it may actually be possible for a given
ReplicaFetcherThread to send FetchRequest out-of-order to the same leader.
But if we were to use this to disregard a possible optimization, it is
probably better if we know whether it can actually happen in reality. I am
not sure. Maybe other developers can comment on this.


> >
> >
> > >
> > > >
> > > > I agree with the previous comments that 1) ideally we want to evolve
> the
> > > > existing existing FetchRequest instead of adding a new request type;
> and
> > > > 2) KIP hopefully can also apply to replication service such as e.g.
> > > > MirrorMaker. In addition, ideally we probably want to implement the
> new
> > > > logic in a separate class without having to modify the existing class
> > > > (e.g. Log, LogManager) so that the implementation and design can be
> > > simpler
> > > > going forward. Motivated by these concepts, I am wondering if the
> > > following
> > > > alternative design may be worth thinking.
> > > >
> > > > Here are the details of a potentially feasible alternative approach.
> > > >
> > > > *Protocol change: *
> > > >
> > > > - We add a fetcherId of string type in the FetchRequest. This
> fetcherId
> > > > is similarly to UUID and helps leader correlate the fetcher (i.e.
> > > > ReplicaFetcherThread or MM consumer) with the state of the fetcher.
> This
> > > > fetcherId is determined by the fetcher. For most consumers this
> fetcherId
> > > > is null. For ReplicaFetcherThread this fetcherId = brokerId +
> > > > threadIndex.
> > > > For MM this is groupId+someIndex.
> > >
> > > As Jay pointed out earlier, there are other consumers besides
> > > MirrorMaker that might want to take advantage of incremental fetch
> > > requests.  He gave the example of the HDFS connector, but there are
> many
> > > others that might want to follow a lot of partitions.  So I don't think
> > > we should special-case MirrorMaker.
> > >
> >
> > Yeah there are indeed many other uses-cases of replication. MM is just
> > one
> > example.
> >
> >
> > >
> > > Also, I do not think that the consumer should choose the UUID.  If the
> > > consumer chooses the UUID, then multiple consumers may choose the same
> > > one, either maliciously or by accident.  We don't need to trust the
> > > client to choose a unique UUID, when the broker can simply choose one
> > > that it knows is unique.  This eliminates a class of bugs which we
> might
> > > otherwise encounter.
> > >
> >
> > The groupId is used to determine the partition assignment and clientId is
> > used to determine quota.
> >
> > It seems that trusting the UUID from consumer has the same problem with
> > trusting the groupId and clientId from consumer. For example, if consumer
> > accidentally or maliciously used the same groupId/clientId as another
> > consumer, it can already cause problem for either the partition
> > assignment
> > of the consumer group or the quota of the clientId. Both problems are
> > expected to be addressed with authentication. It seems OK to treat the
> > UUID
> > with the same trust level as the groupId and we can address this problem
> > with authentication as well. Does this sound reasonable?
>
> I agree that SASL should prevent clients from spoofing each other, when
> it is in use.  However, defense in depth is still a useful security
> principle.  There are also cases where clients could accidentally choose
> duplicate UUIDs, rather than maliciously: software bugs,
> misconfigurations, and so forth.  These things can have serious
> consequences when we're dealing with replication.  In any case, I don't
> think there's any advantage to letting the client choose the ID.


> >
> >
> > >
> > > >
> > > > *Proposed change in leader broker:*
> > > >
> > > > - A new class FetcherHandler will be used in the leader to map the
> > > > fetcherId to state of the fetcher. The state of the fetcher is a
> list of
> > > > FETCH_REQUEST_PARTITION_V0 for selected partitions.
> > > >
> > > > - After leader receives a FetchRequest, it first transforms the
> > > > FetchRequest by doing request = FetcherHandler.addPartition(request)
> > > > before
> > > > giving this partition to KafkaApis.handle(request). If the fetcherId
> in
> > > > this request is null, this method does not make any change.
> Otherwise, it
> > > > takes the list of FETCH_REQUEST_PARTITION_V0 associated with this
> > > > fetcherId
> > > > and append it to the given request. The state of a new non-null
> fetcherId
> > > > is an empty list.
> > > >
> > > > - The KafkaApis.handle(request) will process the request and
> generate a
> > > > response. All existing logic in ReplicaManager, LogManager and so on
> does
> > > > not need to be changed.
> > > >
> > > > - The leader calls response = FetcherHandler.removePartition
> (response)
> > > > before sending the response back to the fetcher.
> > > > FetcherHandler.removePartition(response)
> > > > enumerates all partitions in the response. If a partition is "empty"
> > > > (e.g.
> > > > no records to be sent), this partition and its
> FETCH_REQUEST_PARTITION_V0
> > > > in the original FetchRequest is added to the state  of this
> fetcherId;
> > > > and
> > > > this partition is removed from the response. If the partition is not
> > > > "empty", the partition is remove from the state of this fetcherId.
> > > >
> > > > *Proposed change in the ReplicaFetcherThread:*
> > > >
> > > > - In addition the set of assigned partitions, the
> ReplicaFetcherThreads
> > > > also keeps track of the subset of assigned partitions which are
> non-empty
> > > > in the last FetchResponse. The is initialized to be the set of
> assigned
> > > > partitions. Then it is updated every time a FetchResponse is
> received.
> > > > The
> > > > FetchResponse constructed by ReplicaFetcherThread includes exactly
> this
> > > > subset of assigned partition.
> > > >
> > > > Here is how it works. Say there are 100 partitions (from 0 to 99) and
> > > > initially partition 0 has new data.
> > > >
> > > > - ReplicaFetcherThread will initially send FetchRequest for all 100
> > > > partitions.
> > > > - KafkaApis will return FetchResponse containing all 100 partitions.
> > > > Partition 0 has data but the other 99 partitions are empty.
> > > > - FetcherHandler will map this fetcherId to a list of 99 partitions
> > > > together with related fields in FETCH_REQUEST_PARTITION_V0, e.g.
> fetch
> > > > offset. FetcherHandler will then remove the 99 empty partitions from
> the
> > > > response so that response only contains partition 0.
> > > > - ReplicaFetcherThread receives a response containing only partition
> 0.
> > > > The
> > > > next FetchRequest will contain only partition 0.
> > > >
> > > > The design seems to work and can also handle the case where partition
> > > > switches between active and inactive state. Do you think this would
> > > > address
> > > > the concern in the previous email (e.g. evolve existing protocol)
> > > > properly?
> > >
> > > Thanks for the sketch-- it's very interesting.
> > >
> > > Hmm.   A lot of this sounds like implementation details which are
> > > probably better to discuss in the JIRA.  Also, it's not clear to me why
> > > avoiding changes to existing classes (such as Log) is desirable-- is
> > > there a specific concern you have here?
> >
> >
> > Yeah this is indeed implementation detail. I am providing this mostly to
> > show that it may be possible to evolve the existing FetchRequest without
> > having two code paths for it, which can probably the concern that you
> > mentioned earlier with evolving the FetchRequest. We can probabaly
> > discuss
> > further after the KIP is updated to combine the two request.
>
> Sounds good.
>
> best,
> Colin
>
> >
> > Thanks,
> > Dong
> >
> >
> > >
> >
> >
> > > Thanks for the feedback about re-using the existing FetchRequest.  When
> > > I update the KIP, I will combine the two request, like you and Jay
> > > suggested.  I think it will avoid some duplication.
> > >
> > > cheers,
> > > Colin
> > >
> > > >
> > > > Thanks!
> > > > Dong
> > > >
> > > >
> > > > On Thu, Nov 23, 2017 at 2:12 PM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Ismael,
> > > > >
> > > > > Yes, you are right. The metadata may not help for multiple fetch
> > > thread or
> > > > > the consumer case. Session based approach is probably better in
> this
> > > case.
> > > > >
> > > > > The optimization of only returning data at the offset index entry
> > > boundary
> > > > > may still be worth considering. It also helps improve the index
> lookup
> > > in
> > > > > general.
> > > > >
> > > > > @Jun,
> > > > > Good point of log compacted topics. Perhaps we can make sure the
> read
> > > will
> > > > > always be operated on the original segment file even if a
> compacted log
> > > > > segment is swapped in. Combining this with the above solution which
> > > always
> > > > > returns the data at the index boundary when possible, it seems we
> can
> > > avoid
> > > > > the additional look up safely.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Thu, Nov 23, 2017 at 9:31 AM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > Yes, caching the log segment position after the index lookup may
> > > work.
> > > > > One
> > > > > > subtle issue is that for a compacted topic, the underlying log
> > > segment
> > > > > may
> > > > > > have changed between two consecutive fetch requests, and we need
> to
> > > think
> > > > > > through the impact of that.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe <
> cmcc...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Oh, I see the issue now.  The broker uses sendfile() and sends
> some
> > > > > > > message data without knowing what the ending offset is.  To
> learn
> > > that,
> > > > > > we
> > > > > > > would need another index access.
> > > > > > >
> > > > > > > However, when we do that index->offset lookup, we know that the
> > > next
> > > > > > > offset->index lookup (done in the following fetch request)
> will be
> > > for
> > > > > > the
> > > > > > > same offset.  So we should be able to cache the result (the
> index).
> > > > > > Also:
> > > > > > > Does the operating system’s page cache help us here?
> > > > > > >
> > > > > > > Best,
> > > > > > > Colin
> > > > > > >
> > > > > > > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
> > > > > > > > Hi, Colin,
> > > > > > > >
> > > > > > > > After step 3a, do we need to update the cached offset in the
> > > leader
> > > > > to
> > > > > > be
> > > > > > > > the last offset in the data returned in the fetch response?
> If
> > > so, we
> > > > > > > > need
> > > > > > > > another offset index lookup since the leader only knows that
> it
> > > gives
> > > > > > out
> > > > > > > > X
> > > > > > > > bytes in the fetch response, but not the last offset in
> those X
> > > > > bytes.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe <
> > > cmcc...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > > > > > > > Hi, Colin,
> > > > > > > > > >
> > > > > > > > > > When fetching data for a partition, the leader needs to
> > > translate
> > > > > > the
> > > > > > > > > > fetch offset to a position in a log segment with an index
> > > lookup.
> > > > > > If
> > > > > > > the
> > > > > > > > > fetch
> > > > > > > > > > request now also needs to cache the offset for the next
> fetch
> > > > > > > request,
> > > > > > > > > > there will be an extra offset index lookup.
> > > > > > > > >
> > > > > > > > > Hmm.  So the way I was thinking about it was, with an
> > > incremental
> > > > > > fetch
> > > > > > > > > request, for each partition:
> > > > > > > > >
> > > > > > > > > 1a. the leader consults its cache to find the offset it
> needs
> > > to
> > > > > use
> > > > > > > for
> > > > > > > > > the fetch request
> > > > > > > > > 2a. the leader performs a lookup to translate the offset
> to a
> > > file
> > > > > > > index
> > > > > > > > > 3a. the leader reads the data from the file
> > > > > > > > >
> > > > > > > > > In contrast, with a full fetch request, for each partition:
> > > > > > > > >
> > > > > > > > > 1b. the leader looks at the FetchRequest to find the
> offset it
> > > > > needs
> > > > > > to
> > > > > > > > > use for the fetch request
> > > > > > > > > 2b. the leader performs a lookup to translate the offset
> to a
> > > file
> > > > > > > index
> > > > > > > > > 3b. the leader reads the data from the file
> > > > > > > > >
> > > > > > > > > It seems like there is only one offset index lookup in both
> > > cases?
> > > > > > The
> > > > > > > > > key point is that the cache in step #1a is not stored on
> > > disk.  Or
> > > > > > > maybe
> > > > > > > > > I'm missing something here.
> > > > > > > > >
> > > > > > > > > best,
> > > > > > > > > Colin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > The offset index lookup can
> > > > > > > > > > potentially be expensive since it could require disk
> I/Os.
> > > One
> > > > > way
> > > > > > to
> > > > > > > > > > optimize this a bit is to further cache the log segment
> > > position
> > > > > > for
> > > > > > > the
> > > > > > > > > > next offset. The tricky issue is that for a compacted
> topic,
> > > the
> > > > > > > > > > underlying
> > > > > > > > > > log segment could have changed between two consecutive
> fetch
> > > > > > > requests. We
> > > > > > > > > > could potentially make that case work, but the logic
> will be
> > > more
> > > > > > > > > > complicated.
> > > > > > > > > >
> > > > > > > > > > Another thing is that it seems that the proposal only
> saves
> > > the
> > > > > > > metadata
> > > > > > > > > > overhead if there are low volume topics. If we use Jay's
> > > > > suggestion
> > > > > > > of
> > > > > > > > > > including 0 partitions in subsequent fetch requests, it
> seems
> > > > > that
> > > > > > we
> > > > > > > > > > could
> > > > > > > > > > get the metadata saving even if all topics have
> continuous
> > > > > traffic.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <
> > > > > cmcc...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
> > > > > > > > > > > > Hi, Jay,
> > > > > > > > > > > >
> > > > > > > > > > > > I guess in your proposal the leader has to cache the
> last
> > > > > > offset
> > > > > > > > > given
> > > > > > > > > > > > back for each partition so that it knows from which
> > > offset to
> > > > > > > serve
> > > > > > > > > the
> > > > > > > > > > > next
> > > > > > > > > > > > fetch request.
> > > > > > > > > > >
> > > > > > > > > > > Hi Jun,
> > > > > > > > > > >
> > > > > > > > > > > Just to clarify, the leader has to cache the last
> offset
> > > for
> > > > > each
> > > > > > > > > > > follower / UUID in the original KIP-227 proposal as
> well.
> > > > > Sorry
> > > > > > if
> > > > > > > > > that
> > > > > > > > > > > wasn't clear.
> > > > > > > > > > >
> > > > > > > > > > > > This is doable but it means that the leader needs to
> do
> > > an
> > > > > > > > > > > > additional index lookup per partition to serve a
> fetch
> > > > > request.
> > > > > > > Not
> > > > > > > > > sure
> > > > > > > > > > > > if the benefit from the lighter fetch request
> obviously
> > > > > offsets
> > > > > > > the
> > > > > > > > > > > > additional index lookup though.
> > > > > > > > > > >
> > > > > > > > > > > The runtime impact should be a small constant factor at
> > > most,
> > > > > > > right?
> > > > > > > > > > > You would just have a mapping between UUID and the
> latest
> > > > > offset
> > > > > > in
> > > > > > > > > each
> > > > > > > > > > > partition data structure.  It seems like the runtime
> > > impact of
> > > > > > > looking
> > > > > > > > > > > up the fetch offset in a hash table (or small array)
> in the
> > > > > > > in-memory
> > > > > > > > > > > partition data structure should be very similar to the
> > > runtime
> > > > > > > impact
> > > > > > > > > of
> > > > > > > > > > > looking up the fetch offset in the FetchRequest.
> > > > > > > > > > >
> > > > > > > > > > > The extra memory consumption per partition is
> > > O(num_brokers),
> > > > > > > which is
> > > > > > > > > > > essentially a small constant.  (The fact that brokers
> can
> > > have
> > > > > > > multiple
> > > > > > > > > > > UUIDs due to parallel fetches is a small wrinkle.  But
> we
> > > can
> > > > > > > place an
> > > > > > > > > > > upper bound on the number of UUIDs permitted per
> broker.)
> > > > > > > > > > >
> > > > > > > > > > > best,
> > > > > > > > > > > Colin
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <
> > > j...@confluent.io
> > > > > >
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > I think the general thrust of this makes a ton of
> > > sense.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I don't love that we're introducing a second type
> of
> > > fetch
> > > > > > > > > request. I
> > > > > > > > > > > think
> > > > > > > > > > > > > the motivation is for compatibility, right? But
> isn't
> > > that
> > > > > > what
> > > > > > > > > > > versioning
> > > > > > > > > > > > > is for? Basically to me although the modification
> we're
> > > > > > making
> > > > > > > > > makes
> > > > > > > > > > > sense,
> > > > > > > > > > > > > the resulting protocol doesn't really seem like
> > > something
> > > > > you
> > > > > > > would
> > > > > > > > > > > design
> > > > > > > > > > > > > this way from scratch.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I think I may be misunderstanding the semantics of
> the
> > > > > > > partitions
> > > > > > > > > in
> > > > > > > > > > > > > IncrementalFetchRequest. I think the intention is
> that
> > > the
> > > > > > > server
> > > > > > > > > > > remembers
> > > > > > > > > > > > > the partitions you last requested, and the
> partitions
> > > you
> > > > > > > specify
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > > request are added to this set. This is a bit odd
> though
> > > > > > > because you
> > > > > > > > > > > can add
> > > > > > > > > > > > > partitions but I don't see how you remove them, so
> it
> > > > > doesn't
> > > > > > > > > really
> > > > > > > > > > > let
> > > > > > > > > > > > > you fully make changes incrementally. I suspect I'm
> > > > > > > > > misunderstanding
> > > > > > > > > > > that
> > > > > > > > > > > > > somehow, though. You'd also need to be a little bit
> > > careful
> > > > > > > that
> > > > > > > > > there
> > > > > > > > > > > was
> > > > > > > > > > > > > no way for the server's idea of what the client is
> > > > > interested
> > > > > > > in
> > > > > > > > > and
> > > > > > > > > > > the
> > > > > > > > > > > > > client's idea to ever diverge as you made these
> > > > > modifications
> > > > > > > over
> > > > > > > > > time
> > > > > > > > > > > > > (due to bugs or whatever).
> > > > > > > > > > > > >
> > > > > > > > > > > > > It seems like an alternative would be to not add a
> > > second
> > > > > > > request,
> > > > > > > > > but
> > > > > > > > > > > > > instead change the fetch api and implementation
> > > > > > > > > > > > >
> > > > > > > > > > > > >    1. We save the partitions you last fetched on
> that
> > > > > > > connection
> > > > > > > > > in the
> > > > > > > > > > > > >    session for the connection (as I think you are
> > > > > proposing)
> > > > > > > > > > > > >    2. It only gives you back info on partitions
> that
> > > have
> > > > > > data
> > > > > > > or
> > > > > > > > > have
> > > > > > > > > > > > >    changed (no reason you need the others, right?)
> > > > > > > > > > > > >    3. Not specifying any partitions means "give me
> the
> > > > > > usual",
> > > > > > > as
> > > > > > > > > > > defined
> > > > > > > > > > > > >    by whatever you requested before attached to the
> > > > > session.
> > > > > > > > > > > > >
> > > > > > > > > > > > > This would be a new version of the fetch API, so
> > > > > > compatibility
> > > > > > > > > would be
> > > > > > > > > > > > > retained by retaining the older version as is.
> > > > > > > > > > > > >
> > > > > > > > > > > > > This seems conceptually simpler to me. It's true
> that
> > > you
> > > > > > have
> > > > > > > to
> > > > > > > > > > > resend
> > > > > > > > > > > > > the full set whenever you want to change it, but
> that
> > > > > > actually
> > > > > > > > > seems
> > > > > > > > > > > less
> > > > > > > > > > > > > error prone and that should be rare.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I suspect you guys thought about this and it
> doesn't
> > > quite
> > > > > > > work,
> > > > > > > > > but
> > > > > > > > > > > maybe
> > > > > > > > > > > > > you could explain why?
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Jay
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <
> > > > > > > cmcc...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I created a KIP to improve the scalability and
> > > latency of
> > > > > > > > > > > FetchRequest:
> > > > > > > > > > > > > > https://cwiki.apache.org/confl
> > > uence/display/KAFKA/KIP-
> > > > > > > > > > > > > > 227%3A+Introduce+Incremental+F
> > > etchRequests+to+Increase+
> > > > > > > > > > > > > > Partition+Scalability
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Please take a look.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > cheers,
> > > > > > > > > > > > > > Colin
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
>

Reply via email to