Hey Colin,

Thanks for the KIP! This is definitely useful when there are many idle
partitions in the clusters.

Just in case it is useful, I will provide some number here. We observe that
for a clsuter that have around 2.5k partitions per broker, the
ProduceRequestTotal time average value is around 25 ms. For a cluster with
2.5k partitions per broker whose AllTopicsBytesInRate is only around 6
MB/s, the ProduceRequestTotalTime average value is around 180 ms, most of
which is spent on ProduceRequestRemoteTime. The increased
ProduceRequestTotalTime significantly reduces throughput of producers with
ack=all. I think this KIP can help address this problem.

Here are some of my ideas on the current KIP:

- The KIP says that the follower will include a partition in
the IncrementalFetchRequest if the LEO of the partition has been updated.
It seems that doing so may prevent leader from knowing information (e.g.
LogStartOffset) of the follower that will otherwise be included in the
FetchRequest. Maybe we should have a paragraph to explicitly define the
full criteria of when the fetcher should include a partition in the
FetchResponse and probably include logStartOffset as part of the criteria?

- It seems that every time the set of partitions in the
ReplicaFetcherThread is changed, or if follower restarts, a new UUID will
be generated in the leader and leader will add a new entry in the in-memory
map to map the UUID to list of partitions (and other metadata such as fetch
offset). This map with grow over time depending depending on the frequency
of events such as partition movement or broker restart. As you mentioned,
we probably need to timeout entries in this map. But there is also tradeoff
in this timeout -- large timeout increase memory usage whereas smaller
timeout increases frequency of the full FetchRequest. Could you specify the
default value of this timeout and probably also explain how it affects the
performance of this KIP? Also, do you think we can avoid having duplicate
entries from the same ReplicaFetcher (in case of partition set change) by
using brokerId+fetcherThreadIndex as the UUID?

I agree with the previous comments that 1) ideally we want to evolve the
existing existing FetchRequest instead of adding a new request type; and 2)
KIP hopefully can also apply to replication service such as e.g.
MirrorMaker. In addition, ideally we probably want to implement the new
logic in a separate class without having to modify the existing class (e.g.
Log, LogManager) so that the implementation and design can be simpler going
forward. Motivated by these concepts, I am wondering if the following
alternative design may be worth thinking.

Here are the details of a potentially feasible alternative approach.

*Protocol change: *

- We add a fetcherId of string type in the FetchRequest. This fetcherId is
similarly to UUID and helps leader correlate the fetcher (i.e.
ReplicaFetcherThread or MM consumer) with the state of the fetcher. This
fetcherId is determined by the fetcher. For most consumers this fetcherId
is null. For ReplicaFetcherThread this fetcherId = brokerId + threadIndex.
For MM this is groupId+someIndex.

*Proposed change in leader broker:*

- A new class FetcherHandler will be used in the leader to map the
fetcherId to state of the fetcher. The state of the fetcher is a list of
FETCH_REQUEST_PARTITION_V0 for selected partitions.

- After leader receives a FetchRequest, it first transforms the
FetchRequest by doing request = FetcherHandler.addPartition(request) before
giving this partition to KafkaApis.handle(request). If the fetcherId in
this request is null, this method does not make any change. Otherwise, it
takes the list of FETCH_REQUEST_PARTITION_V0 associated with this fetcherId
and append it to the given request. The state of a new non-null fetcherId
is an empty list.

- The KafkaApis.handle(request) will process the request and generate a
response. All existing logic in ReplicaManager, LogManager and so on does
not need to be changed.

- The leader calls response = FetcherHandler.removePartition(response)
before sending the response back to the fetcher.
FetcherHandler.removePartition(response)
enumerates all partitions in the response. If a partition is "empty" (e.g.
no records to be sent), this partition and its FETCH_REQUEST_PARTITION_V0
in the original FetchRequest is added to the state  of this fetcherId; and
this partition is removed from the response. If the partition is not
"empty", the partition is remove from the state of this fetcherId.

*Proposed change in the ReplicaFetcherThread:*

- In addition the set of assigned partitions, the ReplicaFetcherThreads
also keeps track of the subset of assigned partitions which are non-empty
in the last FetchResponse. The is initialized to be the set of assigned
partitions. Then it is updated every time a FetchResponse is received. The
FetchResponse constructed by ReplicaFetcherThread includes exactly this
subset of assigned partition.

Here is how it works. Say there are 100 partitions (from 0 to 99) and
initially partition 0 has new data.

- ReplicaFetcherThread will initially send FetchRequest for all 100
partitions.
- KafkaApis will return FetchResponse containing all 100 partitions.
Partition 0 has data but the other 99 partitions are empty.
- FetcherHandler will map this fetcherId to a list of 99 partitions
together with related fields in FETCH_REQUEST_PARTITION_V0, e.g. fetch
offset. FetcherHandler will then remove the 99 empty partitions from the
response so that response only contains partition 0.
- ReplicaFetcherThread receives a response containing only partition 0. The
next FetchRequest will contain only partition 0.

The design seems to work and can also handle the case where partition
switches between active and inactive state. Do you think this would address
the concern in the previous email (e.g. evolve existing protocol) properly?

Thanks!
Dong


On Thu, Nov 23, 2017 at 2:12 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Ismael,
>
> Yes, you are right. The metadata may not help for multiple fetch thread or
> the consumer case. Session based approach is probably better in this case.
>
> The optimization of only returning data at the offset index entry boundary
> may still be worth considering. It also helps improve the index lookup in
> general.
>
> @Jun,
> Good point of log compacted topics. Perhaps we can make sure the read will
> always be operated on the original segment file even if a compacted log
> segment is swapped in. Combining this with the above solution which always
> returns the data at the index boundary when possible, it seems we can avoid
> the additional look up safely.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Thu, Nov 23, 2017 at 9:31 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Yes, caching the log segment position after the index lookup may work.
> One
> > subtle issue is that for a compacted topic, the underlying log segment
> may
> > have changed between two consecutive fetch requests, and we need to think
> > through the impact of that.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Nov 22, 2017 at 7:54 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> >
> > > Oh, I see the issue now.  The broker uses sendfile() and sends some
> > > message data without knowing what the ending offset is.  To learn that,
> > we
> > > would need another index access.
> > >
> > > However, when we do that index->offset lookup, we know that the next
> > > offset->index lookup (done in the following fetch request) will be for
> > the
> > > same offset.  So we should be able to cache the result (the index).
> > Also:
> > > Does the operating system’s page cache help us here?
> > >
> > > Best,
> > > Colin
> > >
> > > On Wed, Nov 22, 2017, at 16:53, Jun Rao wrote:
> > > > Hi, Colin,
> > > >
> > > > After step 3a, do we need to update the cached offset in the leader
> to
> > be
> > > > the last offset in the data returned in the fetch response? If so, we
> > > > need
> > > > another offset index lookup since the leader only knows that it gives
> > out
> > > > X
> > > > bytes in the fetch response, but not the last offset in those X
> bytes.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Nov 22, 2017 at 4:01 PM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > On Wed, Nov 22, 2017, at 14:09, Jun Rao wrote:
> > > > > > Hi, Colin,
> > > > > >
> > > > > > When fetching data for a partition, the leader needs to translate
> > the
> > > > > > fetch offset to a position in a log segment with an index lookup.
> > If
> > > the
> > > > > fetch
> > > > > > request now also needs to cache the offset for the next fetch
> > > request,
> > > > > > there will be an extra offset index lookup.
> > > > >
> > > > > Hmm.  So the way I was thinking about it was, with an incremental
> > fetch
> > > > > request, for each partition:
> > > > >
> > > > > 1a. the leader consults its cache to find the offset it needs to
> use
> > > for
> > > > > the fetch request
> > > > > 2a. the leader performs a lookup to translate the offset to a file
> > > index
> > > > > 3a. the leader reads the data from the file
> > > > >
> > > > > In contrast, with a full fetch request, for each partition:
> > > > >
> > > > > 1b. the leader looks at the FetchRequest to find the offset it
> needs
> > to
> > > > > use for the fetch request
> > > > > 2b. the leader performs a lookup to translate the offset to a file
> > > index
> > > > > 3b. the leader reads the data from the file
> > > > >
> > > > > It seems like there is only one offset index lookup in both cases?
> > The
> > > > > key point is that the cache in step #1a is not stored on disk.  Or
> > > maybe
> > > > > I'm missing something here.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > > The offset index lookup can
> > > > > > potentially be expensive since it could require disk I/Os. One
> way
> > to
> > > > > > optimize this a bit is to further cache the log segment position
> > for
> > > the
> > > > > > next offset. The tricky issue is that for a compacted topic, the
> > > > > > underlying
> > > > > > log segment could have changed between two consecutive fetch
> > > requests. We
> > > > > > could potentially make that case work, but the logic will be more
> > > > > > complicated.
> > > > > >
> > > > > > Another thing is that it seems that the proposal only saves the
> > > metadata
> > > > > > overhead if there are low volume topics. If we use Jay's
> suggestion
> > > of
> > > > > > including 0 partitions in subsequent fetch requests, it seems
> that
> > we
> > > > > > could
> > > > > > get the metadata saving even if all topics have continuous
> traffic.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Wed, Nov 22, 2017 at 1:14 PM, Colin McCabe <
> cmcc...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > On Tue, Nov 21, 2017, at 22:11, Jun Rao wrote:
> > > > > > > > Hi, Jay,
> > > > > > > >
> > > > > > > > I guess in your proposal the leader has to cache the last
> > offset
> > > > > given
> > > > > > > > back for each partition so that it knows from which offset to
> > > serve
> > > > > the
> > > > > > > next
> > > > > > > > fetch request.
> > > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Just to clarify, the leader has to cache the last offset for
> each
> > > > > > > follower / UUID in the original KIP-227 proposal as well.
> Sorry
> > if
> > > > > that
> > > > > > > wasn't clear.
> > > > > > >
> > > > > > > > This is doable but it means that the leader needs to do an
> > > > > > > > additional index lookup per partition to serve a fetch
> request.
> > > Not
> > > > > sure
> > > > > > > > if the benefit from the lighter fetch request obviously
> offsets
> > > the
> > > > > > > > additional index lookup though.
> > > > > > >
> > > > > > > The runtime impact should be a small constant factor at most,
> > > right?
> > > > > > > You would just have a mapping between UUID and the latest
> offset
> > in
> > > > > each
> > > > > > > partition data structure.  It seems like the runtime impact of
> > > looking
> > > > > > > up the fetch offset in a hash table (or small array) in the
> > > in-memory
> > > > > > > partition data structure should be very similar to the runtime
> > > impact
> > > > > of
> > > > > > > looking up the fetch offset in the FetchRequest.
> > > > > > >
> > > > > > > The extra memory consumption per partition is O(num_brokers),
> > > which is
> > > > > > > essentially a small constant.  (The fact that brokers can have
> > > multiple
> > > > > > > UUIDs due to parallel fetches is a small wrinkle.  But we can
> > > place an
> > > > > > > upper bound on the number of UUIDs permitted per broker.)
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Tue, Nov 21, 2017 at 7:03 PM, Jay Kreps <j...@confluent.io
> >
> > > wrote:
> > > > > > > >
> > > > > > > > > I think the general thrust of this makes a ton of sense.
> > > > > > > > >
> > > > > > > > > I don't love that we're introducing a second type of fetch
> > > > > request. I
> > > > > > > think
> > > > > > > > > the motivation is for compatibility, right? But isn't that
> > what
> > > > > > > versioning
> > > > > > > > > is for? Basically to me although the modification we're
> > making
> > > > > makes
> > > > > > > sense,
> > > > > > > > > the resulting protocol doesn't really seem like something
> you
> > > would
> > > > > > > design
> > > > > > > > > this way from scratch.
> > > > > > > > >
> > > > > > > > > I think I may be misunderstanding the semantics of the
> > > partitions
> > > > > in
> > > > > > > > > IncrementalFetchRequest. I think the intention is that the
> > > server
> > > > > > > remembers
> > > > > > > > > the partitions you last requested, and the partitions you
> > > specify
> > > > > in
> > > > > > > the
> > > > > > > > > request are added to this set. This is a bit odd though
> > > because you
> > > > > > > can add
> > > > > > > > > partitions but I don't see how you remove them, so it
> doesn't
> > > > > really
> > > > > > > let
> > > > > > > > > you fully make changes incrementally. I suspect I'm
> > > > > misunderstanding
> > > > > > > that
> > > > > > > > > somehow, though. You'd also need to be a little bit careful
> > > that
> > > > > there
> > > > > > > was
> > > > > > > > > no way for the server's idea of what the client is
> interested
> > > in
> > > > > and
> > > > > > > the
> > > > > > > > > client's idea to ever diverge as you made these
> modifications
> > > over
> > > > > time
> > > > > > > > > (due to bugs or whatever).
> > > > > > > > >
> > > > > > > > > It seems like an alternative would be to not add a second
> > > request,
> > > > > but
> > > > > > > > > instead change the fetch api and implementation
> > > > > > > > >
> > > > > > > > >    1. We save the partitions you last fetched on that
> > > connection
> > > > > in the
> > > > > > > > >    session for the connection (as I think you are
> proposing)
> > > > > > > > >    2. It only gives you back info on partitions that have
> > data
> > > or
> > > > > have
> > > > > > > > >    changed (no reason you need the others, right?)
> > > > > > > > >    3. Not specifying any partitions means "give me the
> > usual",
> > > as
> > > > > > > defined
> > > > > > > > >    by whatever you requested before attached to the
> session.
> > > > > > > > >
> > > > > > > > > This would be a new version of the fetch API, so
> > compatibility
> > > > > would be
> > > > > > > > > retained by retaining the older version as is.
> > > > > > > > >
> > > > > > > > > This seems conceptually simpler to me. It's true that you
> > have
> > > to
> > > > > > > resend
> > > > > > > > > the full set whenever you want to change it, but that
> > actually
> > > > > seems
> > > > > > > less
> > > > > > > > > error prone and that should be rare.
> > > > > > > > >
> > > > > > > > > I suspect you guys thought about this and it doesn't quite
> > > work,
> > > > > but
> > > > > > > maybe
> > > > > > > > > you could explain why?
> > > > > > > > >
> > > > > > > > > -Jay
> > > > > > > > >
> > > > > > > > > On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe <
> > > cmcc...@apache.org>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > I created a KIP to improve the scalability and latency of
> > > > > > > FetchRequest:
> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> > > > > > > > > > Partition+Scalability
> > > > > > > > > >
> > > > > > > > > > Please take a look.
> > > > > > > > > >
> > > > > > > > > > cheers,
> > > > > > > > > > Colin
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> > >
> >
>

Reply via email to