On Tue, Dec 19, 2017, at 02:16, Jan Filipiak wrote:
> Sorry for coming back at this so late.
> 
> 
> 
> On 11.12.2017 07:12, Colin McCabe wrote:
> > On Sun, Dec 10, 2017, at 22:10, Colin McCabe wrote:
> >> On Fri, Dec 8, 2017, at 01:16, Jan Filipiak wrote:
> >>> Hi,
> >>>
> >>> sorry for the late reply, busy times :-/
> >>>
> >>> I would ask you one thing maybe. Since the timeout
> >>> argument seems to be settled I have no further argument
> >>> form your side except the "i don't want to".
> >>>
> >>> Can you see that connection.max.idle.max is the exact time
> >>> that expresses "We expect the client to be away for this long,
> >>> and come back and continue"?
> >> Hi Jan,
> >>
> >> Sure, connection.max.idle.max is the exact time that we want to keep
> >> around a TCP session.  TCP sessions are relatively cheap, so we can
> >> afford to keep them around for 10 minutes by default.  Incremental fetch
> >> state is less cheap, so we want to set a shorter timeout for it.  We
> >> also want new TCP sessions to be able to reuse an existing incremental
> >> fetch session rather than creating a new one and waiting for the old one
> >> to time out.
> >>
> >>> also clarified some stuff inline
> >>>
> >>> Best Jan
> >>>
> >>>
> >>>
> >>>
> >>> On 05.12.2017 23:14, Colin McCabe wrote:
> >>>> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> >>>>> Hi Colin
> >>>>>
> >>>>> Addressing the topic of how to manage slots from the other thread.
> >>>>> With tcp connections all this comes for free essentially.
> >>>> Hi Jan,
> >>>>
> >>>> I don't think that it's accurate to say that cache management "comes for
> >>>> free" by coupling the incremental fetch session with the TCP session.
> >>>> When a new TCP session is started by a fetch request, you still have to
> >>>> decide whether to grant that request an incremental fetch session or
> >>>> not.  If your answer is that you always grant the request, I would argue
> >>>> that you do not have cache management.
> >>> First I would say, the client has a big say in this. If the client
> >>> is not going to issue incremental he shouldn't ask for a cache
> >>> when the client ask for the cache we still have all options to deny.
> >> To put it simply, we have to have some cache management above and beyond
> >> just giving out an incremental fetch session to anyone who has a TCP
> >> session.  Therefore, caching does not become simpler if you couple the
> >> fetch session to the TCP session.
> Simply giving out an fetch session for everyone with a connection is too 
> simple,
> but I think it plays well into the idea of consumers choosing to use the 
> feature
> therefore only enabling where it brings maximum gains 
> (replicas,MirrorMakers)
> >>
> >>>> I guess you could argue that timeouts are cache management, but I don't
> >>>> find that argument persuasive.  Anyone could just create a lot of TCP
> >>>> sessions and use a lot of resources, in that case.  So there is
> >>>> essentially no limit on memory use.  In any case, TCP sessions don't
> >>>> help us implement fetch session timeouts.
> >>> We still have all the options denying the request to keep the state.
> >>> What you want seems like a max connections / ip safeguard.
> >>> I can currently take down a broker with to many connections easily.
> >>>
> >>>
> >>>>> I still would argue we disable it by default and make a flag in the
> >>>>> broker to ask the leader to maintain the cache while replicating and 
> >>>>> also only
> >>>>> have it optional in consumers (default to off) so one can turn it on
> >>>>> where it really hurts.  MirrorMaker and audit consumers prominently.
> >>>> I agree with Jason's point from earlier in the thread.  Adding extra
> >>>> configuration knobs that aren't really necessary can harm usability.
> >>>> Certainly asking people to manually turn on a feature "where it really
> >>>> hurts" seems to fall in that category, when we could easily enable it
> >>>> automatically for them.
> >>> This doesn't make much sense to me.
> >> There are no tradeoffs to think about from the client's point of view:
> >> it always wants an incremental fetch session.  So there is no benefit to
> >> making the clients configure an extra setting.  Updating and managing
> >> client configurations is also more difficult than managing broker
> >> configurations for most users.
> >>
> >>> You also wanted to implement
> >>> a "turn of in case of bug"-knob. Having the client indicate if the
> >>> feauture will be used seems reasonable to me.,
> >> True.  However, if there is a bug, we could also roll back the client,
> >> so having this configuration knob is not strictly required.
> >>
> >>>>> Otherwise I left a few remarks in-line, which should help to understand
> >>>>> my view of the situation better
> >>>>>
> >>>>> Best Jan
> >>>>>
> >>>>>
> >>>>> On 05.12.2017 08:06, Colin McCabe wrote:
> >>>>>> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> >>>>>>> On 03.12.2017 21:55, Colin McCabe wrote:
> >>>>>>>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> >>>>>>>>> Thanks for the explanation, Colin. A few more questions.
> >>>>>>>>>
> >>>>>>>>>> The session epoch is not complex.  It's just a number which 
> >>>>>>>>>> increments
> >>>>>>>>>> on each incremental fetch.  The session epoch is also useful for
> >>>>>>>>>> debugging-- it allows you to match up requests and responses when
> >>>>>>>>>> looking at log files.
> >>>>>>>>> Currently each request in Kafka has a correlation id to help match 
> >>>>>>>>> the
> >>>>>>>>> requests and responses. Is epoch doing something differently?
> >>>>>>>> Hi Becket,
> >>>>>>>>
> >>>>>>>> The correlation ID is used within a single TCP session, to uniquely
> >>>>>>>> associate a request with a response.  The correlation ID is not 
> >>>>>>>> unique
> >>>>>>>> (and has no meaning) outside the context of that single TCP session.
> >>>>>>>>
> >>>>>>>> Keep in mind, NetworkClient is in charge of TCP sessions, and 
> >>>>>>>> generally
> >>>>>>>> tries to hide that information from the upper layers of the code.  So
> >>>>>>>> when you submit a request to NetworkClient, you don't know if that
> >>>>>>>> request creates a TCP session, or reuses an existing one.
> >>>>>>>>>> Unfortunately, this doesn't work.  Imagine the client misses an
> >>>>>>>>>> increment fetch response about a partition.  And then the 
> >>>>>>>>>> partition is
> >>>>>>>>>> never updated after that.  The client has no way to know about the
> >>>>>>>>>> partition, since it won't be included in any future incremental 
> >>>>>>>>>> fetch
> >>>>>>>>>> responses.  And there are no offsets to compare, since the 
> >>>>>>>>>> partition is
> >>>>>>>>>> simply omitted from the response.
> >>>>>>>>> I am curious about in which situation would the follower miss a 
> >>>>>>>>> response
> >>>>>>>>> of a partition. If the entire FetchResponse is lost (e.g. timeout), 
> >>>>>>>>> the
> >>>>>>>>> follower would disconnect and retry. That will result in sending a 
> >>>>>>>>> full
> >>>>>>>>> FetchRequest.
> >>>>>>>> Basically, you are proposing that we rely on TCP for reliable 
> >>>>>>>> delivery
> >>>>>>>> in a distributed system.  That isn't a good idea for a bunch of
> >>>>>>>> different reasons.  First of all, TCP timeouts tend to be very long. 
> >>>>>>>>  So
> >>>>>>>> if the TCP session timing out is your error detection mechanism, you
> >>>>>>>> have to wait minutes for messages to timeout.  Of course, we add a
> >>>>>>>> timeout on top of that after which we declare the connection bad and
> >>>>>>>> manually close it.  But just because the session is closed on one end
> >>>>>>>> doesn't mean that the other end knows that it is closed.  So the 
> >>>>>>>> leader
> >>>>>>>> may have to wait quite a long time before TCP decides that yes,
> >>>>>>>> connection X from the follower is dead and not coming back, even 
> >>>>>>>> though
> >>>>>>>> gremlins ate the FIN packet which the follower attempted to 
> >>>>>>>> translate.
> >>>>>>>> If the cache state is tied to that TCP session, we have to keep that
> >>>>>>>> cache around for a much longer time than we should.
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I see this from a different perspective. The cache expiry time
> >>>>>>> has the same semantic as idle connection time in this scenario.
> >>>>>>> It is the time range we expect the client to come back an reuse
> >>>>>>> its broker side state. I would argue that on close we would get an
> >>>>>>> extra shot at cleaning up the session state early. As opposed to
> >>>>>>> always wait for that duration for expiry to happen.
> >>>>>> Hi Jan,
> >>>>>>
> >>>>>> The idea here is that the incremental fetch cache expiry time can be
> >>>>>> much shorter than the TCP session timeout.  In general the TCP session
> >>>>>> timeout is common to all TCP connections, and very long.  To make these
> >>>>>> numbers a little more concrete, the TCP session timeout is often
> >>>>>> configured to be 2 hours on Linux.  (See
> >>>>>> https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html
> >>>>>> )  The timeout I was proposing for incremental fetch sessions was one 
> >>>>>> or
> >>>>>> two minutes at most.
> >>>>> Currently this is taken care of by
> >>>>> connections.max.idle.ms on the broker and defaults to something of few
> >>>>> minutes.
> >>>> It is 10 minutes by default, which is longer than what we want the
> >>>> incremental fetch session timeout to be.  There's no reason to couple
> >>>> these two things.
> >>>>
> >>>>> Also something we could let the client change if we really wanted to.
> >>>>> So there is no need to worry about coupling our implementation to some
> >>>>> timeouts given by the OS, with TCP one always has full control over the 
> >>>>> worst
> >>>>> times + one gets the extra shot cleaning up early when the close comes 
> >>>>> through.
> >>>>> Which is the majority of the cases.
> >>>> In the majority of cases, the TCP session will be re-established.  In
> >>>> that case, we have to send a full fetch request rather than an
> >>>> incremental fetch request.
> >>> I actually have a hard time believing this. Do you have any numbers of
> >>> any existing production system? Is it the virtualisation layer cutting
> >>> all the connections?
> >>> We see this only on application crashes and restarts where the app needs
> >>> todo the full anyways
> >>> as it probably continues with stores offsets.
> >> Yes, TCP connections get dropped.  It happens very often in production
> >> clusters, actually.  When I was working on Hadoop, one of the most
> >> common questions I heard from newcomers was "why do I see so many
> >> EOFException messages in the logs"?  The other thing that happens a lot
> >> is DNS outages or slowness.  Public clouds seem to have even more
> >> unstable networks than the on-premise clusters.  I am not sure why that
> >> is.
> Hadoop has a wiki page on exactly this
> https://wiki.apache.org/hadoop/EOFException
> 
> besides user errors they have servers crashing and actually loss of 
> connection high on their list.
> In the case of "server goes away" the cache goes with it. So nothing to 
> argue about the cache beeing reused by
> a new connection.
> 
> Can you make an argument at which point the epoch would be updated 
> broker side to maximise re-usage of the cache on
> lost connections. In many cases the epoch would go out of sync and we 
> would need a full fetch anyways. Am I mistaken here?

The current proposal is that the server can accept multiple requests in a row 
with the same sequence number.

Colin

> 
> 
> 
> 
> >>
> >>>>>>>> Secondly, from a software engineering perspective, it's not a good 
> >>>>>>>> idea
> >>>>>>>> to try to tightly tie together TCP and our code.  We would have to
> >>>>>>>> rework how we interact with NetworkClient so that we are aware of 
> >>>>>>>> things
> >>>>>>>> like TCP sessions closing or opening.  We would have to be careful
> >>>>>>>> preserve the ordering of incoming messages when doing things like
> >>>>>>>> putting incoming requests on to a queue to be processed by multiple
> >>>>>>>> threads.  It's just a lot of complexity to add, and there's no 
> >>>>>>>> upside.
> >>>>>>> I see the point here. And I had a small chat with Dong Lin already
> >>>>>>> making me aware of this. I tried out the approaches and propose the
> >>>>>>> following:
> >>>>>>>
> >>>>>>> The client start and does a full fetch. It then does incremental 
> >>>>>>> fetches.
> >>>>>>> The connection to the broker dies and is re-established by 
> >>>>>>> NetworkClient
> >>>>>>> under the hood.
> >>>>>>> The broker sees an incremental fetch without having state => returns
> >>>>>>> error:
> >>>>>>> Client sees the error, does a full fetch and goes back to 
> >>>>>>> incrementally
> >>>>>>> fetching.
> >>>>>>>
> >>>>>>> having this 1 additional error round trip is essentially the same as
> >>>>>>> when something
> >>>>>>> with the sessions or epoch changed unexpectedly to the client (say
> >>>>>>> expiry).
> >>>>>>>
> >>>>>>> So its nothing extra added but the conditions are easier to evaluate.
> >>>>>>> Especially since we do everything with NetworkClient. Other 
> >>>>>>> implementers
> >>>>>>> on the
> >>>>>>> protocol are free to optimizes this and do not do the errornours
> >>>>>>> roundtrip on the
> >>>>>>> new connection.
> >>>>>>> Its a great plus that the client can know when the error is gonna
> >>>>>>> happen. instead of
> >>>>>>> the server to always have to report back if something changes
> >>>>>>> unexpectedly for the client
> >>>>>> You are assuming that the leader and the follower agree that the TCP
> >>>>>> session drops at the same time.  When there are network problems, this
> >>>>>> may not be true.  The leader may still think the previous TCP session 
> >>>>>> is
> >>>>>> active.  In that case, we have to keep the incremental fetch session
> >>>>>> state around until we learn otherwise (which could be up to that 2 hour
> >>>>>> timeout I mentioned).  And if we get a new incoming incremental fetch
> >>>>>> request, we can't assume that it replaces the previous one, because the
> >>>>>> IDs will be different (the new one starts a new session).
> >>>>> As mentioned, no reason to fear some time-outs out of our control
> >>>>>>>> Imagine that I made an argument that client IDs are "complex" and 
> >>>>>>>> should
> >>>>>>>> be removed from our APIs.  After all, we can just look at the remote 
> >>>>>>>> IP
> >>>>>>>> address and TCP port of each connection.  Would you think that was a
> >>>>>>>> good idea?  The client ID is useful when looking at logs.  For 
> >>>>>>>> example,
> >>>>>>>> if a rebalance is having problems, you want to know what clients were
> >>>>>>>> having a problem.  So having the client ID field to guide you is
> >>>>>>>> actually much less "complex" in practice than not having an ID.
> >>>>>>> I still cant follow why the correlation idea will not help here.
> >>>>>>> Correlating logs with it usually works great. Even with primitive 
> >>>>>>> tools
> >>>>>>> like grep
> >>>>>> The correlation ID does help somewhat, but certainly not as much as a
> >>>>>> unique 64-bit ID.  The correlation ID is not unique in the broker, just
> >>>>>> unique to a single NetworkClient.  Simiarly, the correlation ID is not
> >>>>>> unique on the client side, if there are multiple Consumers, etc.
> >>>>> Can always bump entropy in correlation IDs, never had a problem
> >>>>> of finding to many duplicates. Would be a different KIP though.
> >>>>>>>> Similarly, if metadata responses had epoch numbers (simple 
> >>>>>>>> incrementing
> >>>>>>>> numbers), we would not have to debug problems like clients 
> >>>>>>>> accidentally
> >>>>>>>> getting old metadata from servers that had been partitioned off from 
> >>>>>>>> the
> >>>>>>>> network for a while.  Clients would know the difference between old 
> >>>>>>>> and
> >>>>>>>> new metadata.  So putting epochs in to the metadata request is much 
> >>>>>>>> less
> >>>>>>>> "complex" operationally, even though it's an extra field in the 
> >>>>>>>> request.
> >>>>>>>>      This has been discussed before on the mailing list.
> >>>>>>>>
> >>>>>>>> So I think the bottom line for me is that having the session ID and
> >>>>>>>> session epoch, while it adds two extra fields, reduces operational
> >>>>>>>> complexity and increases debuggability.  It avoids tightly coupling 
> >>>>>>>> us
> >>>>>>>> to assumptions about reliable ordered delivery which tend to be 
> >>>>>>>> violated
> >>>>>>>> in practice in multiple layers of the stack.  Finally, it  avoids the
> >>>>>>>> necessity of refactoring NetworkClient.
> >>>>>>> So there is stacks out there that violate TCP guarantees? And software
> >>>>>>> still works? How can this be? Can you elaborate a little where this
> >>>>>>> can be violated? I am not very familiar with virtualized environments
> >>>>>>> but they can't really violate TCP contracts.
> >>>>>> TCP's guarantees of reliable, in-order transmission certainly can be
> >>>>>> violated.  For example, I once had to debug a cluster where a certain
> >>>>>> node had a network card which corrupted its transmissions occasionally.
> >>>>>> With all the layers of checksums, you would think that this was not
> >>>>>> possible, but it happened.  We occasionally got corrupted data written
> >>>>>> to disk on the other end because of it.  Even more frustrating, the 
> >>>>>> data
> >>>>>> was not corrupted on disk on the sending node-- it was a bug in the
> >>>>>> network card driver that was injecting the errors.
> >>>>> true, but your broker might aswell read a corrupted 600GB as size from
> >>>>> the network and die with OOM instantly.
> >>>> If you read 600 GB as the size from the network, you will not "die with
> >>>> OOM instantly."  That would be a bug.  Instead, you will notice that 600
> >>>> GB is greater than max.message.bytes, and close the connection.
> >>> We only check max.message.bytes to late to guard against consumer
> >>> stalling.
> >>> we dont have a notion of max.networkpacket.size before we allocate the
> >>> bytebuffer to read it into.
> >> "network packets" are not the same thing as "kafka RPCs."  One Kafka RPC
> >> could take up mutiple ethernet packets.
> >>
> >> Also, max.message.bytes has nothing to do with "consumer stalling" --
> >> you are probably thinking about some of the fetch request
> >> configurations.  max.message.bytes is used by the RPC system to figure
> >> out whether to read the full incoming RP
> > Whoops, this is incorrect.  I was thinking about
> > "socket.request.max.bytes" rather than "max.message.bytes."  Sorry about
> > that.  See Ismael's email as well.
> >
> > best,
> > Colin
> >
> >> best,
> >> Colin
> >>
> >>>>> Optimizing for still having functional
> >>>>> software under this circumstances is not reasonable.
> >>>>> You want to get rid of such a
> >>>>> node ASAP and pray that zookeepers ticks get corrupted often enough
> >>>>> that it finally drops out of the cluster.
> >>>>>
> >>>>> There is a good reason that these kinda things
> >>>>> https://issues.apache.org/jira/browse/MESOS-4105
> >>>>> don't end up as kafka Jiras. In the end you can't run any software in
> >>>>> these containers anymore. Application layer checksums are a neat thing 
> >>>>> to
> >>>>> fail fast but trying to cope with this probably causes more bad than
> >>>>> good.  So I would argue that we shouldn't try this for the fetch 
> >>>>> requests.
> >>>> One of the goals of Apache Kafka is to be "a streaming platform...
> >>>> [that] lets you store streams of records in a fault-tolerant way."  For
> >>>> more information, see https://kafka.apache.org/intro .  Fault-tolerance
> >>>> is explicitly part of the goal of Kafka.  Prayer should be optional, not
> >>>> required, when running the software.
> >>> Yes, we need to fail ASAP when we read corrupted packages. It seemed
> >>> to me like you tried to make the case for pray and try to stay alive.
> >>> Fault
> >>> tolerance here means. I am a fishy box i am going to let a good box
> >>> handle
> >>> it and be silent until i get fixed up.
> >>>> Crashing because someone sent you a bad packet is not reasonable
> >>>> behavior.  It is a bug.  Similarly, bringing down the whole cluster,
> >>>> which could a hundred nodes, because someone had a bad network adapter
> >>>> is not reasonable behavior.  It is perhaps reasonable for the cluster to
> >>>> perform worse when hardware is having problems.  But that's a different
> >>>> discussion.
> >>> See above.
> >>>> best,
> >>>> Colin
> >>>>
> >>>>>> However, my point was not about TCP's guarantees being violated.  My
> >>>>>> point is that TCP's guarantees are only one small building block to
> >>>>>> build a robust distributed system.  TCP basically just says that if you
> >>>>>> get any bytes from the stream, you will get the ones that were sent by
> >>>>>> the sender, in the order they were sent.  TCP does not guarantee that
> >>>>>> the bytes you send will get there.  It does not guarantee that if you
> >>>>>> close the connection, the other end will know about it in a timely
> >>>>>> fashion.
> >>>>> These are very powerful grantees and since we use TCP we should
> >>>>> piggy pack everything that is reasonable on to it. IMO there is no
> >>>>> need to reimplement correct sequencing again if you get that from
> >>>>> your transport layer. It saves you the complexity, it makes
> >>>>> you application behave way more naturally and your api easier to
> >>>>> understand.
> >>>>>
> >>>>> There is literally nothing the Kernel wont let you decide
> >>>>> especially not any timings. Only noticeable exception being TIME_WAIT
> >>>>> of usually 240 seconds but that already has little todo with the broker
> >>>>> itself and
> >>>>> if we are running out of usable ports because of this then expiring
> >>>>> fetch requests
> >>>>> wont help much anyways.
> >>>>>
> >>>>> I hope I could strengthen the trust you have in userland TCP connection
> >>>>> management. It is really powerful and can be exploited for maximum gains
> >>>>> without much risk in my opinion.
> >>>>>
> >>>>>
> >>>>>
> >>>>>> It does not guarantee that the bytes will be received in a
> >>>>>> certain timeframe, and certainly doesn't guarantee that if you send a
> >>>>>> byte on connection X and then on connection Y, that the remote end will
> >>>>>> read a byte on X before reading a byte on Y.
> >>>>> Noone expects this from two independent paths of any kind.
> >>>>>
> >>>>>> best,
> >>>>>> Colin
> >>>>>>
> >>>>>>> Hope this made my view clearer, especially the first part.
> >>>>>>>
> >>>>>>> Best Jan
> >>>>>>>
> >>>>>>>
> >>>>>>>> best,
> >>>>>>>> Colin
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>> If there is an error such as NotLeaderForPartition is
> >>>>>>>>> returned for some partitions, the follower can always send a full
> >>>>>>>>> FetchRequest. Is there a scenario that only some of the partitions 
> >>>>>>>>> in a
> >>>>>>>>> FetchResponse is lost?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<cmcc...@apache.org>  
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin wrote:
> >>>>>>>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin McCabe<cmcc...@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin wrote:
> >>>>>>>>>>>>> Hey Colin,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks much for the update. I have a few questions below:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. I am not very sure that we need Fetch Session Epoch. It 
> >>>>>>>>>>>>> seems that
> >>>>>>>>>>>>> Fetch
> >>>>>>>>>>>>> Session Epoch is only needed to help leader distinguish between 
> >>>>>>>>>>>>> "a
> >>>>>>>>>> full
> >>>>>>>>>>>>> fetch request" and "a full fetch request and request a new
> >>>>>>>>>> incremental
> >>>>>>>>>>>>> fetch session". Alternatively, follower can also indicate "a 
> >>>>>>>>>>>>> full
> >>>>>>>>>> fetch
> >>>>>>>>>>>>> request and request a new incremental fetch session" by setting 
> >>>>>>>>>>>>> Fetch
> >>>>>>>>>>>>> Session ID to -1 without using Fetch Session Epoch. Does this 
> >>>>>>>>>>>>> make
> >>>>>>>>>> sense?
> >>>>>>>>>>>> Hi Dong,
> >>>>>>>>>>>>
> >>>>>>>>>>>> The fetch session epoch is very important for ensuring 
> >>>>>>>>>>>> correctness.  It
> >>>>>>>>>>>> prevents corrupted or incomplete fetch data due to network 
> >>>>>>>>>>>> reordering
> >>>>>>>>>> or
> >>>>>>>>>>>> loss.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For example, consider a scenario where the follower sends a fetch
> >>>>>>>>>>>> request to the leader.  The leader responds, but the response is 
> >>>>>>>>>>>> lost
> >>>>>>>>>>>> because of network problems which affected the TCP session.  In 
> >>>>>>>>>>>> that
> >>>>>>>>>>>> case, the follower must establish a new TCP session and re-send 
> >>>>>>>>>>>> the
> >>>>>>>>>>>> incremental fetch request.  But the leader does not know that the
> >>>>>>>>>>>> follower didn't receive the previous incremental fetch response. 
> >>>>>>>>>>>>  It is
> >>>>>>>>>>>> only the incremental fetch epoch which lets the leader know that 
> >>>>>>>>>>>> it
> >>>>>>>>>>>> needs to resend that data, and not data which comes afterwards.
> >>>>>>>>>>>>
> >>>>>>>>>>>> You could construct similar scenarios with message reordering,
> >>>>>>>>>>>> duplication, etc.  Basically, this is a stateful protocol on an
> >>>>>>>>>>>> unreliable network, and you need to know whether the follower 
> >>>>>>>>>>>> got the
> >>>>>>>>>>>> previous data you sent before you move on.  And you need to 
> >>>>>>>>>>>> handle
> >>>>>>>>>>>> issues like duplicated or delayed requests.  These issues do not 
> >>>>>>>>>>>> affect
> >>>>>>>>>>>> the full fetch request, because it is not stateful-- any full 
> >>>>>>>>>>>> fetch
> >>>>>>>>>>>> request can be understood and properly responded to in isolation.
> >>>>>>>>>>>>
> >>>>>>>>>>> Thanks for the explanation. This makes sense. On the other hand I 
> >>>>>>>>>>> would
> >>>>>>>>>>> be interested in learning more about whether Becket's solution 
> >>>>>>>>>>> can help
> >>>>>>>>>>> simplify the protocol by not having the echo field and whether 
> >>>>>>>>>>> that is
> >>>>>>>>>>> worth doing.
> >>>>>>>>>> Hi Dong,
> >>>>>>>>>>
> >>>>>>>>>> I commented about this in the other thread.  A solution which 
> >>>>>>>>>> doesn't
> >>>>>>>>>> maintain session information doesn't work here.
> >>>>>>>>>>
> >>>>>>>>>>>>> 2. It is said that Incremental FetchRequest will include 
> >>>>>>>>>>>>> partitions
> >>>>>>>>>> whose
> >>>>>>>>>>>>> fetch offset or maximum number of fetch bytes has been changed. 
> >>>>>>>>>>>>> If
> >>>>>>>>>>>>> follower's logStartOffet of a partition has changed, should this
> >>>>>>>>>>>>> partition also be included in the next FetchRequest to the 
> >>>>>>>>>>>>> leader?
> >>>>>>>>>>>> Otherwise, it
> >>>>>>>>>>>>> may affect the handling of DeleteRecordsRequest because leader 
> >>>>>>>>>>>>> may
> >>>>>>>>>> not
> >>>>>>>>>>>> know
> >>>>>>>>>>>>> the corresponding data has been deleted on the follower.
> >>>>>>>>>>>> Yeah, the follower should include the partition if the 
> >>>>>>>>>>>> logStartOffset
> >>>>>>>>>>>> has changed.  That should be spelled out on the KIP.  Fixed.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 3. In the section "Per-Partition Data", a partition is not 
> >>>>>>>>>>>>> considered
> >>>>>>>>>>>>> dirty if its log start offset has changed. Later in the section
> >>>>>>>>>>>> "FetchRequest
> >>>>>>>>>>>>> Changes", it is said that incremental fetch responses will 
> >>>>>>>>>>>>> include a
> >>>>>>>>>>>>> partition if its logStartOffset has changed. It seems 
> >>>>>>>>>>>>> inconsistent.
> >>>>>>>>>> Can
> >>>>>>>>>>>>> you update the KIP to clarify it?
> >>>>>>>>>>>>>
> >>>>>>>>>>>> In the "Per-Partition Data" section, it does say that 
> >>>>>>>>>>>> logStartOffset
> >>>>>>>>>>>> changes make a partition dirty, though, right?  The first bullet 
> >>>>>>>>>>>> point
> >>>>>>>>>>>> is:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> * The LogCleaner deletes messages, and this changes the log 
> >>>>>>>>>>>>> start
> >>>>>>>>>> offset
> >>>>>>>>>>>> of the partition on the leader., or
> >>>>>>>>>>>>
> >>>>>>>>>>> Ah I see. I think I didn't notice this because statement assumes 
> >>>>>>>>>>> that the
> >>>>>>>>>>> LogStartOffset in the leader only changes due to LogCleaner. In 
> >>>>>>>>>>> fact the
> >>>>>>>>>>> LogStartOffset can change on the leader due to either log 
> >>>>>>>>>>> retention and
> >>>>>>>>>>> DeleteRecordsRequest. I haven't verified whether LogCleaner can 
> >>>>>>>>>>> change
> >>>>>>>>>>> LogStartOffset though. It may be a bit better to just say that a
> >>>>>>>>>>> partition is considered dirty if LogStartOffset changes.
> >>>>>>>>>> I agree.  It should be straightforward to just resend the 
> >>>>>>>>>> partition if
> >>>>>>>>>> logStartOffset changes.
> >>>>>>>>>>
> >>>>>>>>>>>>> 4. In "Fetch Session Caching" section, it is said that each 
> >>>>>>>>>>>>> broker
> >>>>>>>>>> has a
> >>>>>>>>>>>>> limited number of slots. How is this number determined? Does 
> >>>>>>>>>>>>> this
> >>>>>>>>>> require
> >>>>>>>>>>>>> a new broker config for this number?
> >>>>>>>>>>>> Good point.  I added two broker configuration parameters to 
> >>>>>>>>>>>> control
> >>>>>>>>>> this
> >>>>>>>>>>>> number.
> >>>>>>>>>>>>
> >>>>>>>>>>> I am curious to see whether we can avoid some of these new 
> >>>>>>>>>>> configs. For
> >>>>>>>>>>> example, incremental.fetch.session.cache.slots.per.broker is 
> >>>>>>>>>>> probably
> >>>>>>>>>> not
> >>>>>>>>>>> necessary because if a leader knows that a FetchRequest comes 
> >>>>>>>>>>> from a
> >>>>>>>>>>> follower, we probably want the leader to always cache the 
> >>>>>>>>>>> information
> >>>>>>>>>>> from that follower. Does this make sense?
> >>>>>>>>>> Yeah, maybe we can avoid having
> >>>>>>>>>> incremental.fetch.session.cache.slots.per.broker.
> >>>>>>>>>>
> >>>>>>>>>>> Maybe we can discuss the config later after there is agreement on 
> >>>>>>>>>>> how the
> >>>>>>>>>>> protocol would look like.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>>> What is the error code if broker does
> >>>>>>>>>>>>> not have new log for the incoming FetchRequest?
> >>>>>>>>>>>> Hmm, is there a typo in this question?  Maybe you meant to ask 
> >>>>>>>>>>>> what
> >>>>>>>>>>>> happens if there is no new cache slot for the incoming 
> >>>>>>>>>>>> FetchRequest?
> >>>>>>>>>>>> That's not an error-- the incremental fetch session ID just gets 
> >>>>>>>>>>>> set to
> >>>>>>>>>>>> 0, indicating no incremental fetch session was created.
> >>>>>>>>>>>>
> >>>>>>>>>>> Yeah there is a typo. You have answered my question.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>>> 5. Can you clarify what happens if follower adds a partition to 
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> ReplicaFetcherThread after receiving LeaderAndIsrRequest? Does 
> >>>>>>>>>>>>> leader
> >>>>>>>>>>>>> needs to generate a new session for this ReplicaFetcherThread or
> >>>>>>>>>> does it
> >>>>>>>>>>>> re-use
> >>>>>>>>>>>>> the existing session?  If it uses a new session, is the old 
> >>>>>>>>>>>>> session
> >>>>>>>>>>>>> actively deleted from the slot?
> >>>>>>>>>>>> The basic idea is that you can't make changes, except by sending 
> >>>>>>>>>>>> a full
> >>>>>>>>>>>> fetch request.  However, perhaps we can allow the client to 
> >>>>>>>>>>>> re-use its
> >>>>>>>>>>>> existing session ID.  If the client sets sessionId = id, epoch = 
> >>>>>>>>>>>> 0, it
> >>>>>>>>>>>> could re-initialize the session.
> >>>>>>>>>>>>
> >>>>>>>>>>> Yeah I agree with the basic idea. We probably want to understand 
> >>>>>>>>>>> more
> >>>>>>>>>>> detail about how this works later.
> >>>>>>>>>> Sounds good.  I updated the KIP with this information.  A
> >>>>>>>>>> re-initialization should be exactly the same as an initialization,
> >>>>>>>>>> except that it reuses an existing ID.
> >>>>>>>>>>
> >>>>>>>>>> best,
> >>>>>>>>>> Colin
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>>> BTW, I think it may be useful if the KIP can include the example
> >>>>>>>>>> workflow
> >>>>>>>>>>>>> of how this feature will be used in case of partition change 
> >>>>>>>>>>>>> and so
> >>>>>>>>>> on.
> >>>>>>>>>>>> Yeah, that might help.
> >>>>>>>>>>>>
> >>>>>>>>>>>> best,
> >>>>>>>>>>>> Colin
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Dong
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin 
> >>>>>>>>>>>>> McCabe<cmcc...@apache.org>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I updated the KIP with the ideas we've been discussing.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin McCabe wrote:
> >>>>>>>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan Filipiak wrote:
> >>>>>>>>>>>>>>>> Hi Colin, thank you  for this KIP, it can become a really
> >>>>>>>>>> useful
> >>>>>>>>>>>> thing.
> >>>>>>>>>>>>>>>> I just scanned through the discussion so far and wanted to
> >>>>>>>>>> start a
> >>>>>>>>>>>>>>>> thread to make as decision about keeping the
> >>>>>>>>>>>>>>>> cache with the Connection / Session or having some sort of 
> >>>>>>>>>>>>>>>> UUID
> >>>>>>>>>>>> indN
> >>>>>>>>>>>>>> exed
> >>>>>>>>>>>>>>>> global Map.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Sorry if that has been settled already and I missed it. In 
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>> case
> >>>>>>>>>>>>>>>> could anyone point me to the discussion?
> >>>>>>>>>>>>>>> Hi Jan,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I don't think anyone has discussed the idea of tying the cache
> >>>>>>>>>> to an
> >>>>>>>>>>>>>>> individual TCP session yet.  I agree that since the cache is
> >>>>>>>>>>>> intended to
> >>>>>>>>>>>>>>> be used only by a single follower or client, it's an 
> >>>>>>>>>>>>>>> interesting
> >>>>>>>>>>>> thing
> >>>>>>>>>>>>>>> to think about.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I guess the obvious disadvantage is that whenever your TCP
> >>>>>>>>>> session
> >>>>>>>>>>>>>>> drops, you have to make a full fetch request rather than an
> >>>>>>>>>>>> incremental
> >>>>>>>>>>>>>>> one.  It's not clear to me how often this happens in practice 
> >>>>>>>>>>>>>>> --
> >>>>>>>>>> it
> >>>>>>>>>>>>>>> probably depends a lot on the quality of the network.  From a
> >>>>>>>>>> code
> >>>>>>>>>>>>>>> perspective, it might also be a bit difficult to access data
> >>>>>>>>>>>> associated
> >>>>>>>>>>>>>>> with the Session from classes like KafkaApis (although we 
> >>>>>>>>>>>>>>> could
> >>>>>>>>>>>> refactor
> >>>>>>>>>>>>>>> it to make this easier).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It's also clear that even if we tie the cache to the session, 
> >>>>>>>>>>>>>>> we
> >>>>>>>>>>>> still
> >>>>>>>>>>>>>>> have to have limits on the number of caches we're willing to
> >>>>>>>>>> create.
> >>>>>>>>>>>>>>> And probably we should reserve some cache slots for each
> >>>>>>>>>> follower, so
> >>>>>>>>>>>>>>> that clients don't take all of them.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Id rather see a protocol in which the client is hinting the
> >>>>>>>>>> broker
> >>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>> he is going to use the feature instead of a client
> >>>>>>>>>>>>>>>> realizing that the broker just offered the feature 
> >>>>>>>>>>>>>>>> (regardless
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>> protocol version which should only indicate that the feature
> >>>>>>>>>>>>>>>> would be usable).
> >>>>>>>>>>>>>>> Hmm.  I'm not sure what you mean by "hinting."  I do think 
> >>>>>>>>>>>>>>> that
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> server should have the option of not accepting incremental
> >>>>>>>>>> requests
> >>>>>>>>>>>> from
> >>>>>>>>>>>>>>> specific clients, in order to save memory space.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This seems to work better with a per
> >>>>>>>>>>>>>>>> connection/session attached Metadata than with a Map and 
> >>>>>>>>>>>>>>>> could
> >>>>>>>>>>>> allow
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>> easier client implementations.
> >>>>>>>>>>>>>>>> It would also make Client-side code easier as there wouldn't
> >>>>>>>>>> be any
> >>>>>>>>>>>>>>>> Cache-miss error Messages to handle.
> >>>>>>>>>>>>>>> It is nice not to have to handle cache-miss responses, I 
> >>>>>>>>>>>>>>> agree.
> >>>>>>>>>>>>>>> However, TCP sessions aren't exposed to most of our 
> >>>>>>>>>>>>>>> client-side
> >>>>>>>>>> code.
> >>>>>>>>>>>>>>> For example, when the Producer creates a message and hands it
> >>>>>>>>>> off to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>> NetworkClient, the NC will transparently re-connect and 
> >>>>>>>>>>>>>>> re-send a
> >>>>>>>>>>>>>>> message if the first send failed.  The higher-level code will
> >>>>>>>>>> not be
> >>>>>>>>>>>>>>> informed about whether the TCP session was re-established,
> >>>>>>>>>> whether an
> >>>>>>>>>>>>>>> existing TCP session was used, and so on.  So overall I would
> >>>>>>>>>> still
> >>>>>>>>>>>> lean
> >>>>>>>>>>>>>>> towards not coupling this to the TCP session...
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>>> Colin
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>       Thank you again for the KIP. And again, if this was 
> >>>>>>>>>>>>>>>> clarified
> >>>>>>>>>>>> already
> >>>>>>>>>>>>>>>> please drop me a hint where I could read about it.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best Jan
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe wrote:
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I created a KIP to improve the scalability and latency of
> >>>>>>>>>>>>>> FetchRequest:
> >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> >>>>>>>>>>>>>> Partition+Scalability
> >>>>>>>>>>>>>>>>> Please take a look.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> cheers,
> >>>>>>>>>>>>>>>>> Colin
> 

Reply via email to