Thanks for the explanation, Colin. A few more questions. >The session epoch is not complex. It's just a number which increments >on each incremental fetch. The session epoch is also useful for >debugging-- it allows you to match up requests and responses when >looking at log files.
Currently each request in Kafka has a correlation id to help match the requests and responses. Is epoch doing something differently? >Unfortunately, this doesn't work. Imagine the client misses an >increment fetch response about a partition. And then the partition is >never updated after that. The client has no way to know about the >partition, since it won't be included in any future incremental fetch >responses. And there are no offsets to compare, since the partition is >simply omitted from the response. I am curious about in which situation would the follower miss a response of a partition. If the entire FetchResponse is lost (e.g. timeout), the follower would disconnect and retry. That will result in sending a full FetchRequest. If there is an error such as NotLeaderForPartition is returned for some partitions, the follower can always send a full FetchRequest. Is there a scenario that only some of the partitions in a FetchResponse is lost? Thanks, Jiangjie (Becket) Qin On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe <cmcc...@apache.org> wrote: > On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe <cmcc...@apache.org> > wrote: > > > > > On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > > Hey Colin, > > > > > > > > Thanks much for the update. I have a few questions below: > > > > > > > > 1. I am not very sure that we need Fetch Session Epoch. It seems that > > > > Fetch > > > > Session Epoch is only needed to help leader distinguish between "a > full > > > > fetch request" and "a full fetch request and request a new > incremental > > > > fetch session". Alternatively, follower can also indicate "a full > fetch > > > > request and request a new incremental fetch session" by setting Fetch > > > > Session ID to -1 without using Fetch Session Epoch. Does this make > sense? > > > > > > Hi Dong, > > > > > > The fetch session epoch is very important for ensuring correctness. It > > > prevents corrupted or incomplete fetch data due to network reordering > or > > > loss. > > > > > > For example, consider a scenario where the follower sends a fetch > > > request to the leader. The leader responds, but the response is lost > > > because of network problems which affected the TCP session. In that > > > case, the follower must establish a new TCP session and re-send the > > > incremental fetch request. But the leader does not know that the > > > follower didn't receive the previous incremental fetch response. It is > > > only the incremental fetch epoch which lets the leader know that it > > > needs to resend that data, and not data which comes afterwards. > > > > > > You could construct similar scenarios with message reordering, > > > duplication, etc. Basically, this is a stateful protocol on an > > > unreliable network, and you need to know whether the follower got the > > > previous data you sent before you move on. And you need to handle > > > issues like duplicated or delayed requests. These issues do not affect > > > the full fetch request, because it is not stateful-- any full fetch > > > request can be understood and properly responded to in isolation. > > > > > > > Thanks for the explanation. This makes sense. On the other hand I would > > be interested in learning more about whether Becket's solution can help > > simplify the protocol by not having the echo field and whether that is > > worth doing. > > Hi Dong, > > I commented about this in the other thread. A solution which doesn't > maintain session information doesn't work here. > > > > > > > > > > > > > > > > > > 2. It is said that Incremental FetchRequest will include partitions > whose > > > > fetch offset or maximum number of fetch bytes has been changed. If > > > > follower's logStartOffet of a partition has changed, should this > > > > partition also be included in the next FetchRequest to the leader? > > > Otherwise, it > > > > may affect the handling of DeleteRecordsRequest because leader may > not > > > know > > > > the corresponding data has been deleted on the follower. > > > > > > Yeah, the follower should include the partition if the logStartOffset > > > has changed. That should be spelled out on the KIP. Fixed. > > > > > > > > > > > 3. In the section "Per-Partition Data", a partition is not considered > > > > dirty if its log start offset has changed. Later in the section > > > "FetchRequest > > > > Changes", it is said that incremental fetch responses will include a > > > > partition if its logStartOffset has changed. It seems inconsistent. > Can > > > > you update the KIP to clarify it? > > > > > > > > > > In the "Per-Partition Data" section, it does say that logStartOffset > > > changes make a partition dirty, though, right? The first bullet point > > > is: > > > > > > > * The LogCleaner deletes messages, and this changes the log start > offset > > > of the partition on the leader., or > > > > > > > Ah I see. I think I didn't notice this because statement assumes that the > > LogStartOffset in the leader only changes due to LogCleaner. In fact the > > LogStartOffset can change on the leader due to either log retention and > > DeleteRecordsRequest. I haven't verified whether LogCleaner can change > > LogStartOffset though. It may be a bit better to just say that a > > partition is considered dirty if LogStartOffset changes. > > I agree. It should be straightforward to just resend the partition if > logStartOffset changes. > > > > > > > > > > > > 4. In "Fetch Session Caching" section, it is said that each broker > has a > > > > limited number of slots. How is this number determined? Does this > require > > > > a new broker config for this number? > > > > > > Good point. I added two broker configuration parameters to control > this > > > number. > > > > > > > I am curious to see whether we can avoid some of these new configs. For > > example, incremental.fetch.session.cache.slots.per.broker is probably > not > > necessary because if a leader knows that a FetchRequest comes from a > > follower, we probably want the leader to always cache the information > > from that follower. Does this make sense? > > Yeah, maybe we can avoid having > incremental.fetch.session.cache.slots.per.broker. > > > > > Maybe we can discuss the config later after there is agreement on how the > > protocol would look like. > > > > > > > > > > > What is the error code if broker does > > > > not have new log for the incoming FetchRequest? > > > > > > Hmm, is there a typo in this question? Maybe you meant to ask what > > > happens if there is no new cache slot for the incoming FetchRequest? > > > That's not an error-- the incremental fetch session ID just gets set to > > > 0, indicating no incremental fetch session was created. > > > > > > > Yeah there is a typo. You have answered my question. > > > > > > > > > > > > > > > 5. Can you clarify what happens if follower adds a partition to the > > > > ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does leader > > > > needs to generate a new session for this ReplicaFetcherThread or > does it > > > re-use > > > > the existing session? If it uses a new session, is the old session > > > > actively deleted from the slot? > > > > > > The basic idea is that you can't make changes, except by sending a full > > > fetch request. However, perhaps we can allow the client to re-use its > > > existing session ID. If the client sets sessionId = id, epoch = 0, it > > > could re-initialize the session. > > > > > > > Yeah I agree with the basic idea. We probably want to understand more > > detail about how this works later. > > Sounds good. I updated the KIP with this information. A > re-initialization should be exactly the same as an initialization, > except that it reuses an existing ID. > > best, > Colin > > > > > > > > > > > > > > > > > > > > > BTW, I think it may be useful if the KIP can include the example > workflow > > > > of how this feature will be used in case of partition change and so > on. > > > > > > Yeah, that might help. > > > > > > best, > > > Colin > > > > > > > > > > > Thanks, > > > > Dong > > > > > > > > > > > > On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe <cmcc...@apache.org> > > > > wrote: > > > > > > > > > I updated the KIP with the ideas we've been discussing. > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > > > > > On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > > > > > > Hi Colin, thank you for this KIP, it can become a really > useful > > > thing. > > > > > > > > > > > > > > I just scanned through the discussion so far and wanted to > start a > > > > > > > thread to make as decision about keeping the > > > > > > > cache with the Connection / Session or having some sort of UUID > > > indN > > > > > exed > > > > > > > global Map. > > > > > > > > > > > > > > Sorry if that has been settled already and I missed it. In this > > > case > > > > > > > could anyone point me to the discussion? > > > > > > > > > > > > Hi Jan, > > > > > > > > > > > > I don't think anyone has discussed the idea of tying the cache > to an > > > > > > individual TCP session yet. I agree that since the cache is > > > intended to > > > > > > be used only by a single follower or client, it's an interesting > > > thing > > > > > > to think about. > > > > > > > > > > > > I guess the obvious disadvantage is that whenever your TCP > session > > > > > > drops, you have to make a full fetch request rather than an > > > incremental > > > > > > one. It's not clear to me how often this happens in practice -- > it > > > > > > probably depends a lot on the quality of the network. From a > code > > > > > > perspective, it might also be a bit difficult to access data > > > associated > > > > > > with the Session from classes like KafkaApis (although we could > > > refactor > > > > > > it to make this easier). > > > > > > > > > > > > It's also clear that even if we tie the cache to the session, we > > > still > > > > > > have to have limits on the number of caches we're willing to > create. > > > > > > And probably we should reserve some cache slots for each > follower, so > > > > > > that clients don't take all of them. > > > > > > > > > > > > > > > > > > > > Id rather see a protocol in which the client is hinting the > broker > > > > > that, > > > > > > > he is going to use the feature instead of a client > > > > > > > realizing that the broker just offered the feature (regardless > of > > > > > > > protocol version which should only indicate that the feature > > > > > > > would be usable). > > > > > > > > > > > > Hmm. I'm not sure what you mean by "hinting." I do think that > the > > > > > > server should have the option of not accepting incremental > requests > > > from > > > > > > specific clients, in order to save memory space. > > > > > > > > > > > > > This seems to work better with a per > > > > > > > connection/session attached Metadata than with a Map and could > > > allow > > > > > for > > > > > > > easier client implementations. > > > > > > > It would also make Client-side code easier as there wouldn't > be any > > > > > > > Cache-miss error Messages to handle. > > > > > > > > > > > > It is nice not to have to handle cache-miss responses, I agree. > > > > > > However, TCP sessions aren't exposed to most of our client-side > code. > > > > > > For example, when the Producer creates a message and hands it > off to > > > the > > > > > > NetworkClient, the NC will transparently re-connect and re-send a > > > > > > message if the first send failed. The higher-level code will > not be > > > > > > informed about whether the TCP session was re-established, > whether an > > > > > > existing TCP session was used, and so on. So overall I would > still > > > lean > > > > > > towards not coupling this to the TCP session... > > > > > > > > > > > > best, > > > > > > Colin > > > > > > > > > > > > > > > > > > > > Thank you again for the KIP. And again, if this was clarified > > > already > > > > > > > please drop me a hint where I could read about it. > > > > > > > > > > > > > > Best Jan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 21.11.2017 22:02, Colin McCabe wrote: > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I created a KIP to improve the scalability and latency of > > > > > FetchRequest: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > > Partition+Scalability > > > > > > > > > > > > > > > > Please take a look. > > > > > > > > > > > > > > > > cheers, > > > > > > > > Colin > > > > > > > > > > > > > > > >