Hi,

good catch about the rotation.
This is probably not a too big blocker. Plenty of ideas spring to my mind
of how this can be done. Maybe one can offer different algorithms here.
(nothing, random shuffle, client sends bitmask which it wants to fetch first, broker logic... many more)

Thank you for considering my ideas. I am pretty convinced we don't need
to aim for the 100% empty fetch request across TCP sessions. Maybe my ideas
offer decent tradeoffs.

Best Jan





On 01.12.2017 08:43, Becket Qin wrote:
Hi Jan,

I agree that we probably don't want to make the protocol too complicated
just for exception cases.

The current FetchRequest contains an ordered list of partitions that may
rotate based on the priority. Therefore it is kind of difficult to do the
order matching. But you brought a good point about order, we may want to
migrate the rotation logic from the clients to the server. Not sure if this
will introduce some complexity to the broker. Intuitively it seems fine.
The logic would basically be similar to the draining logic in the
RecordAccumulator of the producer.

Thanks,

Jiangjie (Becket) Qin

On Thu, Nov 30, 2017 at 11:29 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi,

this discussion is going a little bit far from what I intended this thread
for.
I can see all of this beeing related.

To let you guys know what I am currently thinking is the following:

I do think the handling of Id's and epoch is rather complicated. I think
the complexity
comes from aiming for to much.

1. Currently all the work is towards making fetchRequest
completely empty. This brings all sorts of pain with regards to the broker
actually needs
to know what he send even though it tries to use sendfile as much as
possible.
2. Currently all the work is towards also making empty fetch request
across TCP sessions.

In this thread I aimed to relax our goals with regards to point 2.
Connection resets for us
are really the exceptions and I would argue, trying to introduce
complexity for sparing
1 full request on connection reset is not worth it. Therefore I argued to
keep the Server
side information with the Session instead somewhere global. Its not going
to bring in the
results.

As the discussion unvields I also want to challenge our approach for point
1.
I do not see a reason to introduce complexity (and
  especially on the fetch answer path). Did we consider that from the
client we just send the offsets
we want to fetch and skip the topic partition description and just use the
order to match the information
on the broker side again? This would also reduce the fetch sizes a lot
while skipping a ton of complexity.

Hope these ideas are interesting

best Jan



On 01.12.2017 01:47, Becket Qin wrote:

Hi Colin,

Thanks for updating the KIP. I have two comments:

1. The session epoch seems introducing some complexity. It would be good
if
we don't have to maintain the epoch.
2. If all the partitions has data returned (even a few messages), the next
fetch would be equivalent to a full request. This means the clusters with
continuously small throughput may not save much from the incremental
fetch.

I am wondering if we can avoid session epoch maintenance and address the
fetch efficiency in general with some modifications to the solution. Not
sure if the following would work, but just want to give my ideas.

To solve 1, the basic idea is to let the leader return the partition data
with its expected client's position for each partition. If the client
disagree with the leader's expectation, a full FetchRequest is then sent
to
ask the leader to update the client's position.
To solve 2, when possible, we just let the leader to infer the clients
position instead of asking the clients to provide the position, so the
incremental fetch can be empty in most cases.

More specifically, the protocol will have the following change.
1. Add a new flag called FullFetch to the FetchRequest.
     1) A full FetchRequest is the same as the current FetchRequest with
FullFetch=true.
     2) An incremental FetchRequest is always empty with FullFetch=false.
2. Add a new field called ExpectedPosition(INT64) to each partition data
in
the FetchResponse.

The leader logic:
1. The leader keeps a map from client-id (client-uuid) to the interested
partitions of that client. For each interested partition, the leader keeps
the client's position for that client.
2. When the leader receives a full fetch request (FullFetch=true), the
leader
      1) replaces the interested partitions for the client id with the
partitions in that full fetch request.
      2) updates the client position with the offset specified in that full
fetch request.
      3) if the client is a follower, update the high watermark, etc.
3. When the leader receives an incremental fetch request (typically
empty),
the leader returns the data from all the interested partitions (if any)
according to the position in the interested partitions map.
4. In the FetchResponse, the leader will include an ExpectedFetchingOffset
that the leader thinks the client is fetching at. The value is the client
position of the partition in the interested partition map. This is just to
confirm with the client that the client position in the leader is correct.
5. After sending back the FetchResponse, the leader updates the position
of
the client's interested partitions. (There may be some overhead for the
leader to know of offsets, but I think the trick of returning at index
entry boundary or log end will work efficiently).
6. The leader will expire the client interested partitions if the client
hasn't fetch for some time. And if an incremental request is received when
the map does not contain the client info, an error will be returned to the
client to ask for a FullFetch.

The clients logic:
1. Start with sending a full FetchRequest, including partitions and
offsets.
2. When get a response, check the ExpectedOffsets in the fetch response
and
see if that matches the current log end.
      1) If the ExpectedFetchOffset matches the current log end, the next
fetch request will be an incremental fetch request.
      2) if the ExpectedFetchOffset does not match the current log end, the
next fetch request will be a full FetchRequest.
3. Whenever the partition offset is actively changed (e.g.
consumer.seek(),
follower log truncation, etc), a full FetchRequest is sent.
4. Whenever the interested partition set changes (e.g.
consumer.subscribe()/assign() is called, replica reassignment happens), a
full FetchRequest is sent.
5. Whenever the client needs to retry a fetch, a FullFetch is sent.

The benefits of this approach are:
1. Regardless of the traffic pattern in the cluster, In most cases the
fetch request will be empty.
2. No need to maintain session epochs.

What do you think?

Thanks,

Jiangjie (Becket) Qin


On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe <cmcc...@apache.org> wrote:

On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
Hey Colin,

Thanks much for the update. I have a few questions below:

1. I am not very sure that we need Fetch Session Epoch. It seems that
Fetch
Session Epoch is only needed to help leader distinguish between "a full
fetch request" and "a full fetch request and request a new incremental
fetch session". Alternatively, follower can also indicate "a full fetch
request and request a new incremental fetch session" by setting Fetch
Session ID to -1 without using Fetch Session Epoch. Does this make
sense?

Hi Dong,

The fetch session epoch is very important for ensuring correctness.  It
prevents corrupted or incomplete fetch data due to network reordering or
loss.

For example, consider a scenario where the follower sends a fetch
request to the leader.  The leader responds, but the response is lost
because of network problems which affected the TCP session.  In that
case, the follower must establish a new TCP session and re-send the
incremental fetch request.  But the leader does not know that the
follower didn't receive the previous incremental fetch response.  It is
only the incremental fetch epoch which lets the leader know that it
needs to resend that data, and not data which comes afterwards.

You could construct similar scenarios with message reordering,
duplication, etc.  Basically, this is a stateful protocol on an
unreliable network, and you need to know whether the follower got the
previous data you sent before you move on.  And you need to handle
issues like duplicated or delayed requests.  These issues do not affect
the full fetch request, because it is not stateful-- any full fetch
request can be understood and properly responded to in isolation.

2. It is said that Incremental FetchRequest will include partitions whose
fetch offset or maximum number of fetch bytes has been changed. If
follower's logStartOffet of a partition has changed, should this
partition also be included in the next FetchRequest to the leader?

Otherwise, it

may affect the handling of DeleteRecordsRequest because leader may not

know

the corresponding data has been deleted on the follower.

Yeah, the follower should include the partition if the logStartOffset
has changed.  That should be spelled out on the KIP.  Fixed.

3. In the section "Per-Partition Data", a partition is not considered
dirty if its log start offset has changed. Later in the section

"FetchRequest

Changes", it is said that incremental fetch responses will include a
partition if its logStartOffset has changed. It seems inconsistent. Can
you update the KIP to clarify it?

In the "Per-Partition Data" section, it does say that logStartOffset
changes make a partition dirty, though, right?  The first bullet point
is:

* The LogCleaner deletes messages, and this changes the log start offset
of the partition on the leader., or

4. In "Fetch Session Caching" section, it is said that each broker has a
limited number of slots. How is this number determined? Does this
require
a new broker config for this number?

Good point.  I added two broker configuration parameters to control this
number.

What is the error code if broker does
not have new log for the incoming FetchRequest?

Hmm, is there a typo in this question?  Maybe you meant to ask what
happens if there is no new cache slot for the incoming FetchRequest?
That's not an error-- the incremental fetch session ID just gets set to
0, indicating no incremental fetch session was created.

5. Can you clarify what happens if follower adds a partition to the
ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader
needs to generate a new session for this ReplicaFetcherThread or does it

re-use

the existing session?  If it uses a new session, is the old session
actively deleted from the slot?

The basic idea is that you can't make changes, except by sending a full
fetch request.  However, perhaps we can allow the client to re-use its
existing session ID.  If the client sets sessionId = id, epoch = 0, it
could re-initialize the session.


BTW, I think it may be useful if the KIP can include the example
workflow
of how this feature will be used in case of partition change and so on.

Yeah, that might help.

best,
Colin

Thanks,
Dong


On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe <cmcc...@apache.org>
wrote:

I updated the KIP with the ideas we've been discussing.
best,
Colin

On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:

On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:

Hi Colin, thank you  for this KIP, it can become a really useful

thing.
I just scanned through the discussion so far and wanted to start a
thread to make as decision about keeping the
cache with the Connection / Session or having some sort of UUID

indN
exed
global Map.
Sorry if that has been settled already and I missed it. In this

case
could anyone point me to the discussion?
Hi Jan,

I don't think anyone has discussed the idea of tying the cache to an
individual TCP session yet.  I agree that since the cache is

intended to
be used only by a single follower or client, it's an interesting
thing
to think about.
I guess the obvious disadvantage is that whenever your TCP session
drops, you have to make a full fetch request rather than an

incremental
one.  It's not clear to me how often this happens in practice -- it
probably depends a lot on the quality of the network.  From a code
perspective, it might also be a bit difficult to access data

associated
with the Session from classes like KafkaApis (although we could
refactor
it to make this easier).
It's also clear that even if we tie the cache to the session, we

still
have to have limits on the number of caches we're willing to create.
And probably we should reserve some cache slots for each follower, so
that clients don't take all of them.

Id rather see a protocol in which the client is hinting the broker
that,
he is going to use the feature instead of a client
realizing that the broker just offered the feature (regardless of
protocol version which should only indicate that the feature
would be usable).

Hmm.  I'm not sure what you mean by "hinting."  I do think that the
server should have the option of not accepting incremental requests

from
specific clients, in order to save memory space.
This seems to work better with a per
connection/session attached Metadata than with a Map and could

allow
for
easier client implementations.
It would also make Client-side code easier as there wouldn't be any
Cache-miss error Messages to handle.

It is nice not to have to handle cache-miss responses, I agree.
However, TCP sessions aren't exposed to most of our client-side code.
For example, when the Producer creates a message and hands it off to

the
NetworkClient, the NC will transparently re-connect and re-send a
message if the first send failed.  The higher-level code will not be
informed about whether the TCP session was re-established, whether an
existing TCP session was used, and so on.  So overall I would still

lean
towards not coupling this to the TCP session...
best,
Colin

    Thank you again for the KIP. And again, if this was clarified
already
please drop me a hint where I could read about it.
Best Jan





On 21.11.2017 22:02, Colin McCabe wrote:

Hi all,

I created a KIP to improve the scalability and latency of

FetchRequest:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
227%3A+Introduce+Incremental+FetchRequests+to+Increase+
Partition+Scalability

Please take a look.
cheers,
Colin


Reply via email to