Thanks for responding, Colin. bq. If we have a bunch of small fetch sessions and a bigger client comes in, we might have to evict many small sessions to fit the bigger one.
Suppose there were N small fetch sessions and 1 big fetch session comes in. If the plan is to use number of partitions to approximate heap consumption, that should be good enough, IMHO. Evicting only one of the N small fetch sessions may not release enough memory since the total partition count would increase a lot. Cheers On Tue, Dec 5, 2017 at 1:44 PM, Colin McCabe <cmcc...@apache.org> wrote: > On Tue, Dec 5, 2017, at 11:24, Ted Yu wrote: > > bq. We also have a tunable for total number of cache slots. We never > > cache > > more than this number of incremental fetch sessions. > > > > Is it possible to manage the cache based on heap consumption instead of > > number of slots ? > > It seems heap estimation can be done by counting PartitionData (along > > with overhead for related Map structure). > > Hi Ted, > > That's an interesting idea. I think it starts to get complicated, > though. > > For example, suppose we later implement incrementally adding partitions > to the fetch session. When a fetch session adds more partitions, it > uses more memory. So should this trigger an eviction? > > If we have a bunch of small fetch sessions and a bigger client comes in, > we might have to evict many small sessions to fit the bigger one. But > we probably do want to fit the bigger one in, since bigger requests gain > proportionally more from being incremental. > > [ Small digression: In general fetch requests have some fixed cost plus > a variable cost based on the number of partitions. The more partitions > you add, the more the variable cost comes to dominate. Therefore, it is > especially good to make big fetch requests into incremental fetch > requests. Small fetch requests for one or two partitions may not gain > much, since their cost is dominated by the fixed cost anyway (message > header, TCP overhead, IP packet overhead, etc.) ] > > Overall, I would still lean towards limiting the number of incremental > fetch sessions, rather than trying to create a per-partition data memory > limit. I think the complexity is probably not worth it. The memory > limit is more of a sanity check anyway, than a fine-grained limit. If > we can get the really big clients using incremental fetches, and the > followers using incremental fetches, we have captured most of the > benefits. I'm curious if there is a more elegant way to limit > per-partition that I may have missed, though? > > best, > Colin > > > > > > Cheers > > > > On Tue, Dec 5, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org> > wrote: > > > > > On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote: > > > > Hi Colin, > > > > > > > > Thanks for the response. A couple replies: > > > > > > > > > > > > > I’m a bit ambivalent about letting the client choose the session > > > > > timeout. What if clients choose timeouts that are too long? > Hmm.... > > > > > I do agree the timeout should be sized proportional to > > > > > max.poll.interval.ms. > > > > > > > > > > > > We have solved this in other cases by letting the broker enforce a > > > > maximum timeout. After thinking about it a bit, it's probably > overkill > > > in this > > > > case since the caching is just an optimization. Instead of stressing > over > > > > timeouts and such, I am actually wondering if we just need a > reasonable > > > > session cache eviction policy. For example, when the number of slots > is > > > > exceeded, perhaps you evict the session with the fewest partitions > or the > > > > one with the largest interval between fetches. We could give > priority to > > > > the replicas. Perhaps this might let us get rid of a few of the > configs. > > > > > > I agree that it would be nice to get rid of the tunable for eviction > > > time. However, I'm concerned that if we do, we might run into cache > > > thrashing. For example, if we have N cache slots and N+1 clients that > > > are all fetching continuously, we might have to evict a client on every > > > single fetch. It would be much better to give a cache slot to N > clients > > > and let the last client do full fetch requests. > > > > > > Perhaps we could mitigate this problem by evicting the smallest fetch > > > session-- the one that is for the smallest number of partitions. This > > > would allow "big" clients that fetch many partitions (e.g. MirrorMaker) > > > to get priority. But then you run into the problem where someone > > > fetches a huge number of partitions, and then goes away for a long > time, > > > and you never reuse that cache memory. > > > > > > How about this approach? We have a tunable for minimum eviction time > > > (default 2 minutes). We cannot evict a client before this timeout has > > > expired. We also have a tunable for total number of cache slots. We > > > never cache more than this number of incremental fetch sessions. > > > > > > Sessions become eligible for eviction after 2 minutes, whether or not > > > the session is active. > > > Fetch Request A will evict Fetch Request B if and only if: > > > 1. A has been active in the last 2 minutes and B has not, OR > > > 2. A was made by a follower and B was made by a consumer, OR > > > 3. A has more partitions than B, OR > > > 4. A is newer than B > > > > > > Then, in a setup where consumers are fetching different numbers of > > > partitions, we will eventually converge on giving incremental fetch > > > sessions to the big consumers, and not to the small consumers. In a > > > setup where consumers are all of equal size but the cache is too small > > > for all of them, we still thrash, but slowly. Nobody can be evicted > > > before their 2 minutes are up. So in general, the overhead of the > extra > > > full requests is still low. If someone makes a big request and then > > > shuts down, they get cleaned up after 2 minutes, because of condition > > > #1. And there are only two tunables needed: cache size and eviction > > > time. > > > > > > > > > > > The main reason is if there is a bug in the incremental fetch > feature. > > > > > > > > > > > > > This was in response to my question about removing the consumer > config. > > > > And sure, any new feature may have bugs, but that's what we have > testing > > > for > > > > ;). Users can always fall back to a previous version if there are any > > > > major problems. As you know, it's tough removing configs once they > are > > > there, > > > > so I think we should try to add them only if they make sense in the > long > > > > term. > > > > > > That's a fair point. I guess if we do need to disable incremental > > > fetches in production because of a bug, we can modify the broker > > > configuration to do so (by setting 0 cache slots). > > > > > > best, > > > Colin > > > > > > > > > > > Thanks, > > > > Jason > > > > > > > > On Mon, Dec 4, 2017 at 11:06 PM, Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote: > > > > > > > > > > > > > > > > > > On 03.12.2017 21:55, Colin McCabe wrote: > > > > > > > On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote: > > > > > > >> Thanks for the explanation, Colin. A few more questions. > > > > > > >> > > > > > > >>> The session epoch is not complex. It's just a number which > > > > > increments > > > > > > >>> on each incremental fetch. The session epoch is also useful > for > > > > > > >>> debugging-- it allows you to match up requests and responses > when > > > > > > >>> looking at log files. > > > > > > >> Currently each request in Kafka has a correlation id to help > > > match the > > > > > > >> requests and responses. Is epoch doing something differently? > > > > > > > Hi Becket, > > > > > > > > > > > > > > The correlation ID is used within a single TCP session, to > uniquely > > > > > > > associate a request with a response. The correlation ID is not > > > unique > > > > > > > (and has no meaning) outside the context of that single TCP > > > session. > > > > > > > > > > > > > > Keep in mind, NetworkClient is in charge of TCP sessions, and > > > generally > > > > > > > tries to hide that information from the upper layers of the > code. > > > So > > > > > > > when you submit a request to NetworkClient, you don't know if > that > > > > > > > request creates a TCP session, or reuses an existing one. > > > > > > >>> Unfortunately, this doesn't work. Imagine the client misses > an > > > > > > >>> increment fetch response about a partition. And then the > > > partition > > > > > is > > > > > > >>> never updated after that. The client has no way to know > about > > > the > > > > > > >>> partition, since it won't be included in any future > incremental > > > fetch > > > > > > >>> responses. And there are no offsets to compare, since the > > > partition > > > > > is > > > > > > >>> simply omitted from the response. > > > > > > >> I am curious about in which situation would the follower miss > a > > > > > response > > > > > > >> of a partition. If the entire FetchResponse is lost (e.g. > > > timeout), > > > > > the > > > > > > >> follower would disconnect and retry. That will result in > sending a > > > > > full > > > > > > >> FetchRequest. > > > > > > > Basically, you are proposing that we rely on TCP for reliable > > > delivery > > > > > > > in a distributed system. That isn't a good idea for a bunch of > > > > > > > different reasons. First of all, TCP timeouts tend to be very > > > long. > > > > > So > > > > > > > if the TCP session timing out is your error detection > mechanism, > > > you > > > > > > > have to wait minutes for messages to timeout. Of course, we > add a > > > > > > > timeout on top of that after which we declare the connection > bad > > > and > > > > > > > manually close it. But just because the session is closed on > one > > > end > > > > > > > doesn't mean that the other end knows that it is closed. So > the > > > leader > > > > > > > may have to wait quite a long time before TCP decides that yes, > > > > > > > connection X from the follower is dead and not coming back, > even > > > though > > > > > > > gremlins ate the FIN packet which the follower attempted to > > > translate. > > > > > > > If the cache state is tied to that TCP session, we have to keep > > > that > > > > > > > cache around for a much longer time than we should. > > > > > > Hi, > > > > > > > > > > > > I see this from a different perspective. The cache expiry time > > > > > > has the same semantic as idle connection time in this scenario. > > > > > > It is the time range we expect the client to come back an reuse > > > > > > its broker side state. I would argue that on close we would get > an > > > > > > extra shot at cleaning up the session state early. As opposed to > > > > > > always wait for that duration for expiry to happen. > > > > > > > > > > Hi Jan, > > > > > > > > > > The idea here is that the incremental fetch cache expiry time can > be > > > > > much shorter than the TCP session timeout. In general the TCP > session > > > > > timeout is common to all TCP connections, and very long. To make > these > > > > > numbers a little more concrete, the TCP session timeout is often > > > > > configured to be 2 hours on Linux. (See > > > > > https://www.cyberciti.biz/tips/linux-increasing-or- > > > decreasing-tcp-sockets- > > > > > timeouts.html > > > > > ) The timeout I was proposing for incremental fetch sessions was > one > > > or > > > > > two minutes at most. > > > > > > > > > > > > > > > > > > Secondly, from a software engineering perspective, it's not a > good > > > idea > > > > > > > to try to tightly tie together TCP and our code. We would > have to > > > > > > > rework how we interact with NetworkClient so that we are aware > of > > > > > things > > > > > > > like TCP sessions closing or opening. We would have to be > careful > > > > > > > preserve the ordering of incoming messages when doing things > like > > > > > > > putting incoming requests on to a queue to be processed by > multiple > > > > > > > threads. It's just a lot of complexity to add, and there's no > > > upside. > > > > > > I see the point here. And I had a small chat with Dong Lin > already > > > > > > making me aware of this. I tried out the approaches and propose > the > > > > > > following: > > > > > > > > > > > > The client start and does a full fetch. It then does incremental > > > fetches. > > > > > > The connection to the broker dies and is re-established by > > > NetworkClient > > > > > > under the hood. > > > > > > The broker sees an incremental fetch without having state => > returns > > > > > > error: > > > > > > Client sees the error, does a full fetch and goes back to > > > incrementally > > > > > > fetching. > > > > > > > > > > > > having this 1 additional error round trip is essentially the > same as > > > > > > when something > > > > > > with the sessions or epoch changed unexpectedly to the client > (say > > > > > > expiry). > > > > > > > > > > > > So its nothing extra added but the conditions are easier to > evaluate. > > > > > > Especially since we do everything with NetworkClient. Other > > > implementers > > > > > > on the > > > > > > protocol are free to optimizes this and do not do the errornours > > > > > > roundtrip on the > > > > > > new connection. > > > > > > Its a great plus that the client can know when the error is gonna > > > > > > happen. instead of > > > > > > the server to always have to report back if something changes > > > > > > unexpectedly for the client > > > > > > > > > > You are assuming that the leader and the follower agree that the > TCP > > > > > session drops at the same time. When there are network problems, > this > > > > > may not be true. The leader may still think the previous TCP > session > > > is > > > > > active. In that case, we have to keep the incremental fetch > session > > > > > state around until we learn otherwise (which could be up to that 2 > hour > > > > > timeout I mentioned). And if we get a new incoming incremental > fetch > > > > > request, we can't assume that it replaces the previous one, > because the > > > > > IDs will be different (the new one starts a new session). > > > > > > > > > > > > > > > > > > Imagine that I made an argument that client IDs are "complex" > and > > > > > should > > > > > > > be removed from our APIs. After all, we can just look at the > > > remote IP > > > > > > > address and TCP port of each connection. Would you think that > was > > > a > > > > > > > good idea? The client ID is useful when looking at logs. For > > > example, > > > > > > > if a rebalance is having problems, you want to know what > clients > > > were > > > > > > > having a problem. So having the client ID field to guide you > is > > > > > > > actually much less "complex" in practice than not having an ID. > > > > > > I still cant follow why the correlation idea will not help here. > > > > > > Correlating logs with it usually works great. Even with primitive > > > tools > > > > > > like grep > > > > > > > > > > The correlation ID does help somewhat, but certainly not as much > as a > > > > > unique 64-bit ID. The correlation ID is not unique in the broker, > just > > > > > unique to a single NetworkClient. Simiarly, the correlation ID is > not > > > > > unique on the client side, if there are multiple Consumers, etc. > > > > > > > > > > > > > > > > > > Similarly, if metadata responses had epoch numbers (simple > > > incrementing > > > > > > > numbers), we would not have to debug problems like clients > > > accidentally > > > > > > > getting old metadata from servers that had been partitioned off > > > from > > > > > the > > > > > > > network for a while. Clients would know the difference between > > > old and > > > > > > > new metadata. So putting epochs in to the metadata request is > much > > > > > less > > > > > > > "complex" operationally, even though it's an extra field in the > > > > > request. > > > > > > > This has been discussed before on the mailing list. > > > > > > > > > > > > > > So I think the bottom line for me is that having the session > ID and > > > > > > > session epoch, while it adds two extra fields, reduces > operational > > > > > > > complexity and increases debuggability. It avoids tightly > > > coupling us > > > > > > > to assumptions about reliable ordered delivery which tend to be > > > > > violated > > > > > > > in practice in multiple layers of the stack. Finally, it > avoids > > > the > > > > > > > necessity of refactoring NetworkClient. > > > > > > So there is stacks out there that violate TCP guarantees? And > > > software > > > > > > still works? How can this be? Can you elaborate a little where > this > > > > > > can be violated? I am not very familiar with virtualized > environments > > > > > > but they can't really violate TCP contracts. > > > > > > > > > > TCP's guarantees of reliable, in-order transmission certainly can > be > > > > > violated. For example, I once had to debug a cluster where a > certain > > > > > node had a network card which corrupted its transmissions > occasionally. > > > > > With all the layers of checksums, you would think that this was not > > > > > possible, but it happened. We occasionally got corrupted data > written > > > > > to disk on the other end because of it. Even more frustrating, the > > > data > > > > > was not corrupted on disk on the sending node-- it was a bug in the > > > > > network card driver that was injecting the errors. > > > > > > > > > > However, my point was not about TCP's guarantees being violated. > My > > > > > point is that TCP's guarantees are only one small building block to > > > > > build a robust distributed system. TCP basically just says that > if you > > > > > get any bytes from the stream, you will get the ones that were > sent by > > > > > the sender, in the order they were sent. TCP does not guarantee > that > > > > > the bytes you send will get there. It does not guarantee that if > you > > > > > close the connection, the other end will know about it in a timely > > > > > fashion. It does not guarantee that the bytes will be received in > a > > > > > certain timeframe, and certainly doesn't guarantee that if you > send a > > > > > byte on connection X and then on connection Y, that the remote end > will > > > > > read a byte on X before reading a byte on Y. > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > > > > > > > > Hope this made my view clearer, especially the first part. > > > > > > > > > > > > Best Jan > > > > > > > > > > > > > > > > > > > best, > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > >> If there is an error such as NotLeaderForPartition is > > > > > > >> returned for some partitions, the follower can always send a > full > > > > > > >> FetchRequest. Is there a scenario that only some of the > > > partitions in > > > > > a > > > > > > >> FetchResponse is lost? > > > > > > >> > > > > > > >> Thanks, > > > > > > >> > > > > > > >> Jiangjie (Becket) Qin > > > > > > >> > > > > > > >> > > > > > > >> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe< > cmcc...@apache.org> > > > > > wrote: > > > > > > >> > > > > > > >>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote: > > > > > > >>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe< > > > cmcc...@apache.org> > > > > > > >>> wrote: > > > > > > >>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote: > > > > > > >>>>>> Hey Colin, > > > > > > >>>>>> > > > > > > >>>>>> Thanks much for the update. I have a few questions below: > > > > > > >>>>>> > > > > > > >>>>>> 1. I am not very sure that we need Fetch Session Epoch. It > > > seems > > > > > that > > > > > > >>>>>> Fetch > > > > > > >>>>>> Session Epoch is only needed to help leader distinguish > > > between "a > > > > > > >>> full > > > > > > >>>>>> fetch request" and "a full fetch request and request a new > > > > > > >>> incremental > > > > > > >>>>>> fetch session". Alternatively, follower can also indicate > "a > > > full > > > > > > >>> fetch > > > > > > >>>>>> request and request a new incremental fetch session" by > > > setting > > > > > Fetch > > > > > > >>>>>> Session ID to -1 without using Fetch Session Epoch. Does > this > > > make > > > > > > >>> sense? > > > > > > >>>>> Hi Dong, > > > > > > >>>>> > > > > > > >>>>> The fetch session epoch is very important for ensuring > > > > > correctness. It > > > > > > >>>>> prevents corrupted or incomplete fetch data due to network > > > > > reordering > > > > > > >>> or > > > > > > >>>>> loss. > > > > > > >>>>> > > > > > > >>>>> For example, consider a scenario where the follower sends a > > > fetch > > > > > > >>>>> request to the leader. The leader responds, but the > response > > > is > > > > > lost > > > > > > >>>>> because of network problems which affected the TCP > session. In > > > > > that > > > > > > >>>>> case, the follower must establish a new TCP session and > > > re-send the > > > > > > >>>>> incremental fetch request. But the leader does not know > that > > > the > > > > > > >>>>> follower didn't receive the previous incremental fetch > > > response. > > > > > It is > > > > > > >>>>> only the incremental fetch epoch which lets the leader know > > > that it > > > > > > >>>>> needs to resend that data, and not data which comes > afterwards. > > > > > > >>>>> > > > > > > >>>>> You could construct similar scenarios with message > reordering, > > > > > > >>>>> duplication, etc. Basically, this is a stateful protocol > on an > > > > > > >>>>> unreliable network, and you need to know whether the > follower > > > got > > > > > the > > > > > > >>>>> previous data you sent before you move on. And you need to > > > handle > > > > > > >>>>> issues like duplicated or delayed requests. These issues > do > > > not > > > > > affect > > > > > > >>>>> the full fetch request, because it is not stateful-- any > full > > > fetch > > > > > > >>>>> request can be understood and properly responded to in > > > isolation. > > > > > > >>>>> > > > > > > >>>> Thanks for the explanation. This makes sense. On the other > hand > > > I > > > > > would > > > > > > >>>> be interested in learning more about whether Becket's > solution > > > can > > > > > help > > > > > > >>>> simplify the protocol by not having the echo field and > whether > > > that > > > > > is > > > > > > >>>> worth doing. > > > > > > >>> Hi Dong, > > > > > > >>> > > > > > > >>> I commented about this in the other thread. A solution which > > > doesn't > > > > > > >>> maintain session information doesn't work here. > > > > > > >>> > > > > > > >>>> > > > > > > >>>>>> 2. It is said that Incremental FetchRequest will include > > > > > partitions > > > > > > >>> whose > > > > > > >>>>>> fetch offset or maximum number of fetch bytes has been > > > changed. If > > > > > > >>>>>> follower's logStartOffet of a partition has changed, > should > > > this > > > > > > >>>>>> partition also be included in the next FetchRequest to the > > > leader? > > > > > > >>>>> Otherwise, it > > > > > > >>>>>> may affect the handling of DeleteRecordsRequest because > > > leader may > > > > > > >>> not > > > > > > >>>>> know > > > > > > >>>>>> the corresponding data has been deleted on the follower. > > > > > > >>>>> Yeah, the follower should include the partition if the > > > > > logStartOffset > > > > > > >>>>> has changed. That should be spelled out on the KIP. > Fixed. > > > > > > >>>>> > > > > > > >>>>>> 3. In the section "Per-Partition Data", a partition is not > > > > > considered > > > > > > >>>>>> dirty if its log start offset has changed. Later in the > > > section > > > > > > >>>>> "FetchRequest > > > > > > >>>>>> Changes", it is said that incremental fetch responses will > > > > > include a > > > > > > >>>>>> partition if its logStartOffset has changed. It seems > > > > > inconsistent. > > > > > > >>> Can > > > > > > >>>>>> you update the KIP to clarify it? > > > > > > >>>>>> > > > > > > >>>>> In the "Per-Partition Data" section, it does say that > > > > > logStartOffset > > > > > > >>>>> changes make a partition dirty, though, right? The first > > > bullet > > > > > point > > > > > > >>>>> is: > > > > > > >>>>> > > > > > > >>>>>> * The LogCleaner deletes messages, and this changes the > log > > > start > > > > > > >>> offset > > > > > > >>>>> of the partition on the leader., or > > > > > > >>>>> > > > > > > >>>> Ah I see. I think I didn't notice this because statement > assumes > > > > > that the > > > > > > >>>> LogStartOffset in the leader only changes due to > LogCleaner. In > > > > > fact the > > > > > > >>>> LogStartOffset can change on the leader due to either log > > > retention > > > > > and > > > > > > >>>> DeleteRecordsRequest. I haven't verified whether LogCleaner > can > > > > > change > > > > > > >>>> LogStartOffset though. It may be a bit better to just say > that a > > > > > > >>>> partition is considered dirty if LogStartOffset changes. > > > > > > >>> I agree. It should be straightforward to just resend the > > > partition > > > > > if > > > > > > >>> logStartOffset changes. > > > > > > >>> > > > > > > >>>>>> 4. In "Fetch Session Caching" section, it is said that > each > > > broker > > > > > > >>> has a > > > > > > >>>>>> limited number of slots. How is this number determined? > Does > > > this > > > > > > >>> require > > > > > > >>>>>> a new broker config for this number? > > > > > > >>>>> Good point. I added two broker configuration parameters to > > > control > > > > > > >>> this > > > > > > >>>>> number. > > > > > > >>>>> > > > > > > >>>> I am curious to see whether we can avoid some of these new > > > configs. > > > > > For > > > > > > >>>> example, incremental.fetch.session.cache.slots.per.broker > is > > > > > probably > > > > > > >>> not > > > > > > >>>> necessary because if a leader knows that a FetchRequest > comes > > > from a > > > > > > >>>> follower, we probably want the leader to always cache the > > > > > information > > > > > > >>>> from that follower. Does this make sense? > > > > > > >>> Yeah, maybe we can avoid having > > > > > > >>> incremental.fetch.session.cache.slots.per.broker. > > > > > > >>> > > > > > > >>>> Maybe we can discuss the config later after there is > agreement > > > on > > > > > how the > > > > > > >>>> protocol would look like. > > > > > > >>>> > > > > > > >>>> > > > > > > >>>>>> What is the error code if broker does > > > > > > >>>>>> not have new log for the incoming FetchRequest? > > > > > > >>>>> Hmm, is there a typo in this question? Maybe you meant to > ask > > > what > > > > > > >>>>> happens if there is no new cache slot for the incoming > > > > > FetchRequest? > > > > > > >>>>> That's not an error-- the incremental fetch session ID just > > > gets > > > > > set to > > > > > > >>>>> 0, indicating no incremental fetch session was created. > > > > > > >>>>> > > > > > > >>>> Yeah there is a typo. You have answered my question. > > > > > > >>>> > > > > > > >>>> > > > > > > >>>>>> 5. Can you clarify what happens if follower adds a > partition > > > to > > > > > the > > > > > > >>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? > Does > > > > > leader > > > > > > >>>>>> needs to generate a new session for this > ReplicaFetcherThread > > > or > > > > > > >>> does it > > > > > > >>>>> re-use > > > > > > >>>>>> the existing session? If it uses a new session, is the > old > > > > > session > > > > > > >>>>>> actively deleted from the slot? > > > > > > >>>>> The basic idea is that you can't make changes, except by > > > sending a > > > > > full > > > > > > >>>>> fetch request. However, perhaps we can allow the client to > > > re-use > > > > > its > > > > > > >>>>> existing session ID. If the client sets sessionId = id, > epoch > > > = > > > > > 0, it > > > > > > >>>>> could re-initialize the session. > > > > > > >>>>> > > > > > > >>>> Yeah I agree with the basic idea. We probably want to > understand > > > > > more > > > > > > >>>> detail about how this works later. > > > > > > >>> Sounds good. I updated the KIP with this information. A > > > > > > >>> re-initialization should be exactly the same as an > > > initialization, > > > > > > >>> except that it reuses an existing ID. > > > > > > >>> > > > > > > >>> best, > > > > > > >>> Colin > > > > > > >>> > > > > > > >>> > > > > > > >>>>>> BTW, I think it may be useful if the KIP can include the > > > example > > > > > > >>> workflow > > > > > > >>>>>> of how this feature will be used in case of partition > change > > > and > > > > > so > > > > > > >>> on. > > > > > > >>>>> Yeah, that might help. > > > > > > >>>>> > > > > > > >>>>> best, > > > > > > >>>>> Colin > > > > > > >>>>> > > > > > > >>>>>> Thanks, > > > > > > >>>>>> Dong > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin McCabe< > > > cmcc...@apache.org > > > > > > > > > > > > >>>>>> wrote: > > > > > > >>>>>> > > > > > > >>>>>>> I updated the KIP with the ideas we've been discussing. > > > > > > >>>>>>> > > > > > > >>>>>>> best, > > > > > > >>>>>>> Colin > > > > > > >>>>>>> > > > > > > >>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote: > > > > > > >>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote: > > > > > > >>>>>>>>> Hi Colin, thank you for this KIP, it can become a > really > > > > > > >>> useful > > > > > > >>>>> thing. > > > > > > >>>>>>>>> I just scanned through the discussion so far and > wanted to > > > > > > >>> start a > > > > > > >>>>>>>>> thread to make as decision about keeping the > > > > > > >>>>>>>>> cache with the Connection / Session or having some > sort of > > > UUID > > > > > > >>>>> indN > > > > > > >>>>>>> exed > > > > > > >>>>>>>>> global Map. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Sorry if that has been settled already and I missed > it. In > > > this > > > > > > >>>>> case > > > > > > >>>>>>>>> could anyone point me to the discussion? > > > > > > >>>>>>>> Hi Jan, > > > > > > >>>>>>>> > > > > > > >>>>>>>> I don't think anyone has discussed the idea of tying the > > > cache > > > > > > >>> to an > > > > > > >>>>>>>> individual TCP session yet. I agree that since the > cache is > > > > > > >>>>> intended to > > > > > > >>>>>>>> be used only by a single follower or client, it's an > > > interesting > > > > > > >>>>> thing > > > > > > >>>>>>>> to think about. > > > > > > >>>>>>>> > > > > > > >>>>>>>> I guess the obvious disadvantage is that whenever your > TCP > > > > > > >>> session > > > > > > >>>>>>>> drops, you have to make a full fetch request rather > than an > > > > > > >>>>> incremental > > > > > > >>>>>>>> one. It's not clear to me how often this happens in > > > practice -- > > > > > > >>> it > > > > > > >>>>>>>> probably depends a lot on the quality of the network. > From > > > a > > > > > > >>> code > > > > > > >>>>>>>> perspective, it might also be a bit difficult to access > data > > > > > > >>>>> associated > > > > > > >>>>>>>> with the Session from classes like KafkaApis (although > we > > > could > > > > > > >>>>> refactor > > > > > > >>>>>>>> it to make this easier). > > > > > > >>>>>>>> > > > > > > >>>>>>>> It's also clear that even if we tie the cache to the > > > session, we > > > > > > >>>>> still > > > > > > >>>>>>>> have to have limits on the number of caches we're > willing to > > > > > > >>> create. > > > > > > >>>>>>>> And probably we should reserve some cache slots for each > > > > > > >>> follower, so > > > > > > >>>>>>>> that clients don't take all of them. > > > > > > >>>>>>>> > > > > > > >>>>>>>>> Id rather see a protocol in which the client is > hinting the > > > > > > >>> broker > > > > > > >>>>>>> that, > > > > > > >>>>>>>>> he is going to use the feature instead of a client > > > > > > >>>>>>>>> realizing that the broker just offered the feature > > > (regardless > > > > > > >>> of > > > > > > >>>>>>>>> protocol version which should only indicate that the > > > feature > > > > > > >>>>>>>>> would be usable). > > > > > > >>>>>>>> Hmm. I'm not sure what you mean by "hinting." I do > think > > > that > > > > > > >>> the > > > > > > >>>>>>>> server should have the option of not accepting > incremental > > > > > > >>> requests > > > > > > >>>>> from > > > > > > >>>>>>>> specific clients, in order to save memory space. > > > > > > >>>>>>>> > > > > > > >>>>>>>>> This seems to work better with a per > > > > > > >>>>>>>>> connection/session attached Metadata than with a Map > and > > > could > > > > > > >>>>> allow > > > > > > >>>>>>> for > > > > > > >>>>>>>>> easier client implementations. > > > > > > >>>>>>>>> It would also make Client-side code easier as there > > > wouldn't > > > > > > >>> be any > > > > > > >>>>>>>>> Cache-miss error Messages to handle. > > > > > > >>>>>>>> It is nice not to have to handle cache-miss responses, I > > > agree. > > > > > > >>>>>>>> However, TCP sessions aren't exposed to most of our > > > client-side > > > > > > >>> code. > > > > > > >>>>>>>> For example, when the Producer creates a message and > hands > > > it > > > > > > >>> off to > > > > > > >>>>> the > > > > > > >>>>>>>> NetworkClient, the NC will transparently re-connect and > > > re-send > > > > > a > > > > > > >>>>>>>> message if the first send failed. The higher-level code > > > will > > > > > > >>> not be > > > > > > >>>>>>>> informed about whether the TCP session was > re-established, > > > > > > >>> whether an > > > > > > >>>>>>>> existing TCP session was used, and so on. So overall I > > > would > > > > > > >>> still > > > > > > >>>>> lean > > > > > > >>>>>>>> towards not coupling this to the TCP session... > > > > > > >>>>>>>> > > > > > > >>>>>>>> best, > > > > > > >>>>>>>> Colin > > > > > > >>>>>>>> > > > > > > >>>>>>>>> Thank you again for the KIP. And again, if this was > > > > > clarified > > > > > > >>>>> already > > > > > > >>>>>>>>> please drop me a hint where I could read about it. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Best Jan > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote: > > > > > > >>>>>>>>>> Hi all, > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> I created a KIP to improve the scalability and > latency of > > > > > > >>>>>>> FetchRequest: > > > > > > >>>>>>>>>> https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > > > > > > >>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+ > > > > > > >>>>>>> Partition+Scalability > > > > > > >>>>>>>>>> Please take a look. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> cheers, > > > > > > >>>>>>>>>> Colin > > > > > > > > > > > > > > >