On Tue, Dec 12, 2017, at 11:48, Becket Qin wrote:
> Hi Colin,
> 
> I am not completely sure, but I am hoping that when we do
> FileChannel.transferTo() the OS will just use a fixed buffer to transfer
> the data to the socket channel without polluting the page cache. But this
> might not be true if we are using SSL.

Hi Becket,

sendfile always uses the page cache.  See this comment by Linus
Torvalds: http://yarchive.net/comp/linux/sendfile.html

> sendfile() wants the source to be in the page cache, because the whole
> point of sendfile() was to avoid a copy. 

> 
> The point I want to make is that avoiding doing binary search on index
> file and avoid reading the log segments during fetch has some additional
> benefits. So if the solution works for the current KIP, it might be a
> better choice.

Let's discuss this in a follow-on KIP.

> 
> Regarding the fixed session for the entire life of the clients, it may be
> also related to another issue we want to solve with broker epoch in
> KAFKA-6029. If we can make sure the session id will not change along the
> life time of clients, we can use that session id instead of creating a
> separate broker epoch and add that to the FetchRequest.

These issues are not really related.  That JIRA is proposing a "broker
epoch" that would uniquely identify different incarnations of the
broker.  In contrast, the fetch session ID doesn't uniquely identify
even a single client, because a single client can have multiple fetcher
threads.  In that case, each thread performing a fetch would have a
fetcher ID.  Even if you only have a single fetcher thread, a given
follower will have a different fetch session ID on each different
leader.

best,
Colin

> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> On Mon, Dec 11, 2017 at 3:25 PM, Colin McCabe <cmcc...@apache.org> wrote:
> 
> > On Mon, Dec 11, 2017, at 14:51, Becket Qin wrote:
> > > Hi Jun,
> > >
> > > Yes, I agree avoiding reading the log segment is not the primary goal for
> > > this KIP. I brought this up because recently I saw a significant
> > > throughput
> > > impact when a broker is down for 20 - 30 min and rejoins a cluster. The
> > > bytes in rate could drop by 50% when that broker is trying to catch up
> > > with
> > > the leaders even in a big cluster (a single broker should not have such
> > > big
> > > impact on the entire cluster).
> >
> > Hi Becket,
> >
> > It sounds like the broker was fetching older data which wasn't in the
> > page cache?  That sounds like it could definitely have a negative impact
> > on the cluster.  It is a little troubling if the impact is a 50% drop in
> > throughput, though.
> >
> > It's a little unclear how to mitigate this, since old data is definitely
> > not going to be in memory.  Maybe we need to work on making sure that
> > slow fetches going on by one fetcher do not slow down all the other
> > worker threads...?
> >
> > > And some users also reported such cascading
> > > degradation, i.e. when one consumer lags behind, the other consumers will
> > > also start to lag behind. So I think addressing this is an important
> > > improvement. I will run some test and see if returning at index boundary
> > > to avoid the log scan would help address this issue. That being said, I
> > > agree that we don't have to address this issue in this KIP. I can submit
> > > another KIP later if avoiding the log segment scan helps.
> >
> > Thanks, that's really interesting.
> >
> > I agree that it might be better in a follow-on KIP.
> >
> > Is the goal to improve the cold-cache case?  Maybe avoid looking at the
> > index file altogether (except for the initial setup)?  That would be a
> > nice improvement for consumers fetching big sequential chunks of
> > historic data.
> >
> > regards,
> > Colin
> >
> >
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Dec 11, 2017 at 1:06 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Colin,
> > > >
> > > > I went over the latest KIP wiki and have a few comments here.
> > > >
> > > > 1) The KIP says that client ID is a string if the session belongs to a
> > > > Kafka consumer. And it is a numerical follower Id if the session
> > belongs to
> > > > a follower. Can we have a consistent type for the client Id?
> > > >
> > > > 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> > > > broker". If the broker has multiple replica fetcher thread, do they all
> > > > have the same follower Id in teh leader broker?
> > > >
> > > > 3) One of the condition for evicting an existing session is that "The
> > new
> > > > session belongs to a follower, and the existing session belongs to a
> > > > regular consumer". I am not sure the session from follower should also
> > be
> > > > restricted by the newly added config. It seems that we will always
> > create
> > > > lots for FetchRequest from follower brokers. Maybe the
> > > > "max.incremental.fetch.session.cache.slots" should only be applies if
> > the
> > > > FetchRequest comes from a client consumer?
> > > >
> > > > 4) Not sure I fully understand how the "The last dirty sequence
> > number" is
> > > > used. It is mentioned that "Let P1 have a last dirty sequence number of
> > > > 100, and P2 have a last dirty sequence number of 101. An incremental
> > fetch
> > > > request with sequence number 100 will return information about both P1
> > and
> > > > P2." But would be the fetch offset for P2 in this case, if the last
> > fetch
> > > > offset stored in the Fetch Session for P2 is associated with the last
> > dirty
> > > > sequence number 101 for P2? My gut feel is that you would have to
> > stored
> > > > the fetch offset for sequence number 100 for P2 as well. Did I miss
> > > > something here?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin <becket....@gmail.com>
> > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > I see. Yes, that makes sense. Are we going to do that only for the
> > > > fetches
> > > > > whose per partition fetch size cannot reach the first index entry
> > after
> > > > the
> > > > > fetch position, or are we going to do that for any fetch? If we do
> > that
> > > > for
> > > > > any fetch, then we will still need to read the actual log segment,
> > which
> > > > > could be expensive if the data is no longer in the cache. This hurts
> > > > > performance if some fetches are on the old log segments.
> > > > >
> > > > > I took a quick look on the clusters we have. The idle topic ratio
> > varies
> > > > > depending on the usage of the cluster. For our metric cluster and
> > > > database
> > > > > replication clusters almost all the topics are actively used. For
> > > > tracking
> > > > > clusters, ~70% topics have data coming in at different rate. For
> > other
> > > > > clusters such as queuing and data deployment. There are more idle
> > topics
> > > > > and the traffic is more bursty (I don't have the exact number here).
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe <cmcc...@apache.org>
> > > > wrote:
> > > > >
> > > > > > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > > > > > Hi, Jiangjie,
> > > > > > >
> > > > > > > What I described is almost the same as yours. The only extra
> > thing is
> > > > > to
> > > > > > > scan the log segment from the identified index entry a bit more
> > to
> > > > > find a
> > > > > > > file position that ends at a message set boundary and is less
> > than
> > > > the
> > > > > > > partition level fetch size. This way, we still preserve the
> > current
> > > > > > > semantic of not returning more bytes than fetch size unless
> > there is
> > > > a
> > > > > > > single message set larger than the fetch size.
> > > > > > >
> > > > > > > In a typically cluster at LinkedIn, what's the percentage of idle
> > > > > > > partitions?
> > > > > >
> > > > > > Yeah, that would be a great number to get.
> > > > > >
> > > > > > Of course, KIP-227 will also benefit partitions that are not
> > completely
> > > > > > idle.  For instance, a partition that's getting just one message a
> > > > > > second will appear in many fetch requests, unless every other
> > partition
> > > > > > in the system is also only getting a low rate of incoming messages.
> > > > > >
> > > > > > regards,
> > > > > > Colin
> > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 6, 2017 at 6:57 PM, Becket Qin <becket....@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > Yes, we still need to handle the corner case. And you are
> > right, it
> > > > > is
> > > > > > all
> > > > > > > > about trade-off between simplicity and the performance gain.
> > > > > > > >
> > > > > > > > I was thinking that the brokers always return at least
> > > > > > > > log.index.interval.bytes per partition to the consumer, just
> > like
> > > > we
> > > > > > will
> > > > > > > > return at least one message to the user. This way we don't
> > need to
> > > > > > worry
> > > > > > > > about the case that the fetch size is smaller than the index
> > > > > interval.
> > > > > > We
> > > > > > > > may just need to let users know this behavior change.
> > > > > > > >
> > > > > > > > Not sure if I completely understand your solution, but I think
> > we
> > > > are
> > > > > > > > thinking about the same. i.e. for the first fetch asking for
> > offset
> > > > > > x0, we
> > > > > > > > will need to do a binary search to find the position p0. and
> > then
> > > > the
> > > > > > > > broker will iterate over the index entries starting from the
> > first
> > > > > > index
> > > > > > > > entry whose offset is greater than p0 until it reaches the
> > index
> > > > > > entry(x1,
> > > > > > > > p1) so that p1 - p0 is just under the fetch size, but the next
> > > > entry
> > > > > > will
> > > > > > > > exceed the fetch size. We then return the bytes from p0 to p1.
> > > > > > Meanwhile
> > > > > > > > the broker caches the next fetch (x1, p1). So when the next
> > fetch
> > > > > > comes, it
> > > > > > > > will just iterate over the offset index entry starting at (x1,
> > p1).
> > > > > > > >
> > > > > > > > It is true that in the above approach, the log compacted topic
> > > > needs
> > > > > > to be
> > > > > > > > handled. It seems that this can be solved by checking whether
> > the
> > > > > > cached
> > > > > > > > index and the new log index are still the same index object. If
> > > > they
> > > > > > are
> > > > > > > > not the same, we can fall back to binary search with the cached
> > > > > > offset. It
> > > > > > > > is admittedly more complicated than the current logic, but
> > given
> > > > the
> > > > > > binary
> > > > > > > > search logic already exists, it seems the additional object
> > sanity
> > > > > > check is
> > > > > > > > not too much work.
> > > > > > > >
> > > > > > > > Not sure if the above implementation is simple enough to
> > justify
> > > > the
> > > > > > > > performance improvement. Let me know if you see potential
> > > > complexity.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 6, 2017 at 4:48 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > > > >
> > > > > > > > > Hi, Becket,
> > > > > > > > >
> > > > > > > > > Yes, I agree that it's rare to have the fetch size smaller
> > than
> > > > > index
> > > > > > > > > interval. It's just that we still need additional code to
> > handle
> > > > > the
> > > > > > rare
> > > > > > > > > case.
> > > > > > > > >
> > > > > > > > > If you go this far, a more general approach (i.e., without
> > > > > returning
> > > > > > at
> > > > > > > > the
> > > > > > > > > index boundary) is the following. We can cache the following
> > > > > > metadata for
> > > > > > > > > the next fetch offset: the file position in the log segment,
> > the
> > > > > > first
> > > > > > > > > index slot at or after the file position. When serving a
> > fetch
> > > > > > request,
> > > > > > > > we
> > > > > > > > > scan the index entries from the cached index slot until we
> > hit
> > > > the
> > > > > > fetch
> > > > > > > > > size. We can then send the data at the message set boundary
> > and
> > > > > > update
> > > > > > > > the
> > > > > > > > > cached metadata for the next fetch offset. This is kind of
> > > > > > complicated,
> > > > > > > > but
> > > > > > > > > probably not more than your approach if the corner case has
> > to be
> > > > > > > > handled.
> > > > > > > > >
> > > > > > > > > In both the above approach and your approach, we need the
> > > > > additional
> > > > > > > > logic
> > > > > > > > > to handle compacted topic since a log segment (and therefore
> > its
> > > > > > index)
> > > > > > > > can
> > > > > > > > > be replaced between two consecutive fetch requests.
> > > > > > > > >
> > > > > > > > > Overall, I agree that the general approach that you proposed
> > > > > applies
> > > > > > more
> > > > > > > > > widely since we get the benefit even when all topics are high
> > > > > volume.
> > > > > > > > It's
> > > > > > > > > just that it would be better if we could think of a simpler
> > > > > > > > implementation.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Tue, Dec 5, 2017 at 9:38 PM, Becket Qin <
> > becket....@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > That is true, but in reality it seems rare that the fetch
> > size
> > > > is
> > > > > > > > smaller
> > > > > > > > > > than index interval. In the worst case, we may need to do
> > > > another
> > > > > > look
> > > > > > > > > up.
> > > > > > > > > > In the future, when we have the mechanism to inform the
> > clients
> > > > > > about
> > > > > > > > the
> > > > > > > > > > broker configurations, the clients may want to configure
> > > > > > > > correspondingly
> > > > > > > > > as
> > > > > > > > > > well, e.g. max message size, max timestamp difference, etc.
> > > > > > > > > >
> > > > > > > > > > On the other hand, we are not guaranteeing that the
> > returned
> > > > > bytes
> > > > > > in a
> > > > > > > > > > partition is always bounded by the per partition fetch
> > size,
> > > > > > because we
> > > > > > > > > are
> > > > > > > > > > going to return at least one message, so the per partition
> > > > fetch
> > > > > > size
> > > > > > > > > seems
> > > > > > > > > > already a soft limit. Since we are introducing a new fetch
> > > > > > protocol and
> > > > > > > > > > this is related, it might be worth considering this option.
> > > > > > > > > >
> > > > > > > > > > BTW, one reason I bring this up again was because
> > yesterday we
> > > > > had
> > > > > > a
> > > > > > > > > > presentation from Uber regarding the end to end latency.
> > And
> > > > they
> > > > > > are
> > > > > > > > > > seeing this binary search behavior impacting the latency
> > due to
> > > > > > page
> > > > > > > > > in/out
> > > > > > > > > > of the index file.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao <j...@confluent.io>
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Jiangjie,
> > > > > > > > > > >
> > > > > > > > > > > Not sure returning the fetch response at the index
> > boundary
> > > > is
> > > > > a
> > > > > > > > > general
> > > > > > > > > > > solution. The index interval is configurable. If one
> > > > configures
> > > > > > the
> > > > > > > > > index
> > > > > > > > > > > interval larger than the per partition fetch size, we
> > > > probably
> > > > > > have
> > > > > > > > to
> > > > > > > > > > > return data not at the index boundary.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin <
> > > > > becket....@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Colin,
> > > > > > > > > > > >
> > > > > > > > > > > > Thinking about this again. I do see the reason that we
> > want
> > > > > to
> > > > > > > > have a
> > > > > > > > > > > epoch
> > > > > > > > > > > > to avoid out of order registration of the interested
> > set.
> > > > But
> > > > > > I am
> > > > > > > > > > > > wondering if the following semantic would meet what we
> > want
> > > > > > better:
> > > > > > > > > > > >  - Session Id: the id assigned to a single client for
> > life
> > > > > long
> > > > > > > > time.
> > > > > > > > > > i.e
> > > > > > > > > > > > it does not change when the interested partitions
> > change.
> > > > > > > > > > > >  - Epoch: the interested set epoch. Only updated when a
> > > > full
> > > > > > fetch
> > > > > > > > > > > request
> > > > > > > > > > > > comes, which may result in the interested partition set
> > > > > change.
> > > > > > > > > > > > This will ensure that the registered interested set
> > will
> > > > > > always be
> > > > > > > > > the
> > > > > > > > > > > > latest registration. And the clients can change the
> > > > > interested
> > > > > > > > > > partition
> > > > > > > > > > > > set without creating another session.
> > > > > > > > > > > >
> > > > > > > > > > > > Also I want to bring up the way the leader respond to
> > the
> > > > > > > > > FetchRequest
> > > > > > > > > > > > again. I think it would be a big improvement if we just
> > > > > return
> > > > > > the
> > > > > > > > > > > > responses at index entry boundary or log end. There
> > are a
> > > > few
> > > > > > > > > benefits:
> > > > > > > > > > > > 1. The leader does not need the follower to provide the
> > > > > > offsets,
> > > > > > > > > > > > 2. The fetch requests no longer need to do a binary
> > search
> > > > on
> > > > > > the
> > > > > > > > > > index,
> > > > > > > > > > > it
> > > > > > > > > > > > just need to do a linear access to the index file,
> > which is
> > > > > > much
> > > > > > > > > cache
> > > > > > > > > > > > friendly.
> > > > > > > > > > > >
> > > > > > > > > > > > Assuming the leader can get the last returned offsets
> > to
> > > > the
> > > > > > > > clients
> > > > > > > > > > > > cheaply, I am still not sure why it is necessary for
> > the
> > > > > > followers
> > > > > > > > to
> > > > > > > > > > > > repeat the offsets in the incremental fetch every time.
> > > > > > Intuitively
> > > > > > > > > it
> > > > > > > > > > > > should only update the offsets when the leader has
> > wrong
> > > > > > offsets,
> > > > > > > > in
> > > > > > > > > > most
> > > > > > > > > > > > cases, the incremental fetch request should just be
> > empty.
> > > > > > > > Otherwise
> > > > > > > > > we
> > > > > > > > > > > may
> > > > > > > > > > > > not be saving much when there are continuous small
> > requests
> > > > > > going
> > > > > > > > to
> > > > > > > > > > each
> > > > > > > > > > > > partition, which could be normal for some low latency
> > > > > systems.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe <
> > > > > > cmcc...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > > > > > > > > > > > Hi Colin
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Addressing the topic of how to manage slots from
> > the
> > > > > other
> > > > > > > > > thread.
> > > > > > > > > > > > > > With tcp connections all this comes for free
> > > > essentially.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi Jan,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I don't think that it's accurate to say that cache
> > > > > management
> > > > > > > > > "comes
> > > > > > > > > > > for
> > > > > > > > > > > > > free" by coupling the incremental fetch session with
> > the
> > > > > TCP
> > > > > > > > > session.
> > > > > > > > > > > > > When a new TCP session is started by a fetch
> > request, you
> > > > > > still
> > > > > > > > > have
> > > > > > > > > > to
> > > > > > > > > > > > > decide whether to grant that request an incremental
> > fetch
> > > > > > session
> > > > > > > > > or
> > > > > > > > > > > > > not.  If your answer is that you always grant the
> > > > request,
> > > > > I
> > > > > > > > would
> > > > > > > > > > > argue
> > > > > > > > > > > > > that you do not have cache management.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I guess you could argue that timeouts are cache
> > > > management,
> > > > > > but I
> > > > > > > > > > don't
> > > > > > > > > > > > > find that argument persuasive.  Anyone could just
> > create
> > > > a
> > > > > > lot of
> > > > > > > > > TCP
> > > > > > > > > > > > > sessions and use a lot of resources, in that case.
> > So
> > > > > there
> > > > > > is
> > > > > > > > > > > > > essentially no limit on memory use.  In any case, TCP
> > > > > > sessions
> > > > > > > > > don't
> > > > > > > > > > > > > help us implement fetch session timeouts.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > I still would argue we disable it by default and
> > make a
> > > > > > flag in
> > > > > > > > > the
> > > > > > > > > > > > > > broker to ask the leader to maintain the cache
> > while
> > > > > > > > replicating
> > > > > > > > > > and
> > > > > > > > > > > > > also only
> > > > > > > > > > > > > > have it optional in consumers (default to off) so
> > one
> > > > can
> > > > > > turn
> > > > > > > > it
> > > > > > > > > > on
> > > > > > > > > > > > > > where it really hurts.  MirrorMaker and audit
> > consumers
> > > > > > > > > > prominently.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I agree with Jason's point from earlier in the
> > thread.
> > > > > > Adding
> > > > > > > > > extra
> > > > > > > > > > > > > configuration knobs that aren't really necessary can
> > harm
> > > > > > > > > usability.
> > > > > > > > > > > > > Certainly asking people to manually turn on a feature
> > > > > "where
> > > > > > it
> > > > > > > > > > really
> > > > > > > > > > > > > hurts" seems to fall in that category, when we could
> > > > easily
> > > > > > > > enable
> > > > > > > > > it
> > > > > > > > > > > > > automatically for them.
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Otherwise I left a few remarks in-line, which
> > should
> > > > help
> > > > > > to
> > > > > > > > > > > understand
> > > > > > > > > > > > > > my view of the situation better
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Best Jan
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On 05.12.2017 08:06, Colin McCabe wrote:
> > > > > > > > > > > > > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak
> > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > > > > > > > > > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin
> > wrote:
> > > > > > > > > > > > > > >>>> Thanks for the explanation, Colin. A few more
> > > > > > questions.
> > > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > > >>>>> The session epoch is not complex.  It's just
> > a
> > > > > number
> > > > > > > > which
> > > > > > > > > > > > > increments
> > > > > > > > > > > > > > >>>>> on each incremental fetch.  The session
> > epoch is
> > > > > also
> > > > > > > > > useful
> > > > > > > > > > > for
> > > > > > > > > > > > > > >>>>> debugging-- it allows you to match up
> > requests
> > > > and
> > > > > > > > > responses
> > > > > > > > > > > when
> > > > > > > > > > > > > > >>>>> looking at log files.
> > > > > > > > > > > > > > >>>> Currently each request in Kafka has a
> > correlation
> > > > id
> > > > > > to
> > > > > > > > help
> > > > > > > > > > > match
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >>>> requests and responses. Is epoch doing
> > something
> > > > > > > > > differently?
> > > > > > > > > > > > > > >>> Hi Becket,
> > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > >>> The correlation ID is used within a single TCP
> > > > > > session, to
> > > > > > > > > > > uniquely
> > > > > > > > > > > > > > >>> associate a request with a response.  The
> > > > correlation
> > > > > > ID is
> > > > > > > > > not
> > > > > > > > > > > > > unique
> > > > > > > > > > > > > > >>> (and has no meaning) outside the context of
> > that
> > > > > > single TCP
> > > > > > > > > > > > session.
> > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > >>> Keep in mind, NetworkClient is in charge of TCP
> > > > > > sessions,
> > > > > > > > and
> > > > > > > > > > > > > generally
> > > > > > > > > > > > > > >>> tries to hide that information from the upper
> > > > layers
> > > > > > of the
> > > > > > > > > > code.
> > > > > > > > > > > > So
> > > > > > > > > > > > > > >>> when you submit a request to NetworkClient, you
> > > > don't
> > > > > > know
> > > > > > > > if
> > > > > > > > > > > that
> > > > > > > > > > > > > > >>> request creates a TCP session, or reuses an
> > > > existing
> > > > > > one.
> > > > > > > > > > > > > > >>>>> Unfortunately, this doesn't work.  Imagine
> > the
> > > > > client
> > > > > > > > > misses
> > > > > > > > > > an
> > > > > > > > > > > > > > >>>>> increment fetch response about a partition.
> > And
> > > > > > then the
> > > > > > > > > > > > > partition is
> > > > > > > > > > > > > > >>>>> never updated after that.  The client has no
> > way
> > > > to
> > > > > > know
> > > > > > > > > > about
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >>>>> partition, since it won't be included in any
> > > > future
> > > > > > > > > > incremental
> > > > > > > > > > > > > fetch
> > > > > > > > > > > > > > >>>>> responses.  And there are no offsets to
> > compare,
> > > > > > since
> > > > > > > > the
> > > > > > > > > > > > > partition is
> > > > > > > > > > > > > > >>>>> simply omitted from the response.
> > > > > > > > > > > > > > >>>> I am curious about in which situation would
> > the
> > > > > > follower
> > > > > > > > > miss
> > > > > > > > > > a
> > > > > > > > > > > > > response
> > > > > > > > > > > > > > >>>> of a partition. If the entire FetchResponse is
> > > > lost
> > > > > > (e.g.
> > > > > > > > > > > > timeout),
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >>>> follower would disconnect and retry. That will
> > > > > result
> > > > > > in
> > > > > > > > > > > sending a
> > > > > > > > > > > > > full
> > > > > > > > > > > > > > >>>> FetchRequest.
> > > > > > > > > > > > > > >>> Basically, you are proposing that we rely on
> > TCP
> > > > for
> > > > > > > > reliable
> > > > > > > > > > > > > delivery
> > > > > > > > > > > > > > >>> in a distributed system.  That isn't a good
> > idea
> > > > for
> > > > > a
> > > > > > > > bunch
> > > > > > > > > of
> > > > > > > > > > > > > > >>> different reasons.  First of all, TCP timeouts
> > tend
> > > > > to
> > > > > > be
> > > > > > > > > very
> > > > > > > > > > > > > long.  So
> > > > > > > > > > > > > > >>> if the TCP session timing out is your error
> > > > detection
> > > > > > > > > > mechanism,
> > > > > > > > > > > > you
> > > > > > > > > > > > > > >>> have to wait minutes for messages to timeout.
> > Of
> > > > > > course,
> > > > > > > > we
> > > > > > > > > > add
> > > > > > > > > > > a
> > > > > > > > > > > > > > >>> timeout on top of that after which we declare
> > the
> > > > > > > > connection
> > > > > > > > > > bad
> > > > > > > > > > > > and
> > > > > > > > > > > > > > >>> manually close it.  But just because the
> > session is
> > > > > > closed
> > > > > > > > on
> > > > > > > > > > one
> > > > > > > > > > > > end
> > > > > > > > > > > > > > >>> doesn't mean that the other end knows that it
> > is
> > > > > > closed.
> > > > > > > > So
> > > > > > > > > > the
> > > > > > > > > > > > > leader
> > > > > > > > > > > > > > >>> may have to wait quite a long time before TCP
> > > > decides
> > > > > > that
> > > > > > > > > yes,
> > > > > > > > > > > > > > >>> connection X from the follower is dead and not
> > > > coming
> > > > > > back,
> > > > > > > > > > even
> > > > > > > > > > > > > though
> > > > > > > > > > > > > > >>> gremlins ate the FIN packet which the follower
> > > > > > attempted to
> > > > > > > > > > > > > translate.
> > > > > > > > > > > > > > >>> If the cache state is tied to that TCP
> > session, we
> > > > > > have to
> > > > > > > > > keep
> > > > > > > > > > > > that
> > > > > > > > > > > > > > >>> cache around for a much longer time than we
> > should.
> > > > > > > > > > > > > > >> Hi,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> I see this from a different perspective. The
> > cache
> > > > > > expiry
> > > > > > > > time
> > > > > > > > > > > > > > >> has the same semantic as idle connection time in
> > > > this
> > > > > > > > > scenario.
> > > > > > > > > > > > > > >> It is the time range we expect the client to
> > come
> > > > back
> > > > > > an
> > > > > > > > > reuse
> > > > > > > > > > > > > > >> its broker side state. I would argue that on
> > close
> > > > we
> > > > > > would
> > > > > > > > > get
> > > > > > > > > > an
> > > > > > > > > > > > > > >> extra shot at cleaning up the session state
> > early.
> > > > As
> > > > > > > > opposed
> > > > > > > > > to
> > > > > > > > > > > > > > >> always wait for that duration for expiry to
> > happen.
> > > > > > > > > > > > > > > Hi Jan,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The idea here is that the incremental fetch cache
> > > > > expiry
> > > > > > time
> > > > > > > > > can
> > > > > > > > > > > be
> > > > > > > > > > > > > > > much shorter than the TCP session timeout.  In
> > > > general
> > > > > > the
> > > > > > > > TCP
> > > > > > > > > > > > session
> > > > > > > > > > > > > > > timeout is common to all TCP connections, and
> > very
> > > > > > long.  To
> > > > > > > > > make
> > > > > > > > > > > > these
> > > > > > > > > > > > > > > numbers a little more concrete, the TCP session
> > > > timeout
> > > > > > is
> > > > > > > > > often
> > > > > > > > > > > > > > > configured to be 2 hours on Linux.  (See
> > > > > > > > > > > > > > > https://www.cyberciti.biz/
> > tips/linux-increasing-or-
> > > > > > > > > > > > > decreasing-tcp-sockets-timeouts.html
> > > > > > > > > > > > > > > )  The timeout I was proposing for incremental
> > fetch
> > > > > > sessions
> > > > > > > > > was
> > > > > > > > > > > one
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > > two minutes at most.
> > > > > > > > > > > > > > Currently this is taken care of by
> > > > > > > > > > > > > > connections.max.idle.ms on the broker and
> > defaults to
> > > > > > > > something
> > > > > > > > > of
> > > > > > > > > > > few
> > > > > > > > > > > > > > minutes.
> > > > > > > > > > > > >
> > > > > > > > > > > > > It is 10 minutes by default, which is longer than
> > what we
> > > > > > want
> > > > > > > > the
> > > > > > > > > > > > > incremental fetch session timeout to be.  There's no
> > > > reason
> > > > > > to
> > > > > > > > > couple
> > > > > > > > > > > > > these two things.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Also something we could let the client change if we
> > > > > really
> > > > > > > > wanted
> > > > > > > > > > to.
> > > > > > > > > > > > > > So there is no need to worry about coupling our
> > > > > > implementation
> > > > > > > > to
> > > > > > > > > > > some
> > > > > > > > > > > > > > timeouts given by the OS, with TCP one always has
> > full
> > > > > > control
> > > > > > > > > over
> > > > > > > > > > > the
> > > > > > > > > > > > > worst
> > > > > > > > > > > > > > times + one gets the extra shot cleaning up early
> > when
> > > > > the
> > > > > > > > close
> > > > > > > > > > > comes
> > > > > > > > > > > > > through.
> > > > > > > > > > > > > > Which is the majority of the cases.
> > > > > > > > > > > > >
> > > > > > > > > > > > > In the majority of cases, the TCP session will be
> > > > > > re-established.
> > > > > > > > > In
> > > > > > > > > > > > > that case, we have to send a full fetch request
> > rather
> > > > than
> > > > > > an
> > > > > > > > > > > > > incremental fetch request.
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >>> Secondly, from a software engineering
> > perspective,
> > > > > it's
> > > > > > > > not a
> > > > > > > > > > > good
> > > > > > > > > > > > > idea
> > > > > > > > > > > > > > >>> to try to tightly tie together TCP and our
> > code.
> > > > We
> > > > > > would
> > > > > > > > > have
> > > > > > > > > > > to
> > > > > > > > > > > > > > >>> rework how we interact with NetworkClient so
> > that
> > > > we
> > > > > > are
> > > > > > > > > aware
> > > > > > > > > > of
> > > > > > > > > > > > > things
> > > > > > > > > > > > > > >>> like TCP sessions closing or opening.  We would
> > > > have
> > > > > > to be
> > > > > > > > > > > careful
> > > > > > > > > > > > > > >>> preserve the ordering of incoming messages when
> > > > doing
> > > > > > > > things
> > > > > > > > > > like
> > > > > > > > > > > > > > >>> putting incoming requests on to a queue to be
> > > > > > processed by
> > > > > > > > > > > multiple
> > > > > > > > > > > > > > >>> threads.  It's just a lot of complexity to
> > add, and
> > > > > > there's
> > > > > > > > > no
> > > > > > > > > > > > > upside.
> > > > > > > > > > > > > > >> I see the point here. And I had a small chat
> > with
> > > > Dong
> > > > > > Lin
> > > > > > > > > > already
> > > > > > > > > > > > > > >> making me aware of this. I tried out the
> > approaches
> > > > > and
> > > > > > > > > propose
> > > > > > > > > > > the
> > > > > > > > > > > > > > >> following:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> The client start and does a full fetch. It then
> > does
> > > > > > > > > incremental
> > > > > > > > > > > > > fetches.
> > > > > > > > > > > > > > >> The connection to the broker dies and is
> > > > > re-established
> > > > > > by
> > > > > > > > > > > > > NetworkClient
> > > > > > > > > > > > > > >> under the hood.
> > > > > > > > > > > > > > >> The broker sees an incremental fetch without
> > having
> > > > > > state =>
> > > > > > > > > > > returns
> > > > > > > > > > > > > > >> error:
> > > > > > > > > > > > > > >> Client sees the error, does a full fetch and
> > goes
> > > > back
> > > > > > to
> > > > > > > > > > > > > incrementally
> > > > > > > > > > > > > > >> fetching.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> having this 1 additional error round trip is
> > > > > > essentially the
> > > > > > > > > > same
> > > > > > > > > > > as
> > > > > > > > > > > > > > >> when something
> > > > > > > > > > > > > > >> with the sessions or epoch changed unexpectedly
> > to
> > > > the
> > > > > > > > client
> > > > > > > > > > (say
> > > > > > > > > > > > > > >> expiry).
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> So its nothing extra added but the conditions
> > are
> > > > > > easier to
> > > > > > > > > > > > evaluate.
> > > > > > > > > > > > > > >> Especially since we do everything with
> > > > NetworkClient.
> > > > > > Other
> > > > > > > > > > > > > implementers
> > > > > > > > > > > > > > >> on the
> > > > > > > > > > > > > > >> protocol are free to optimizes this and do not
> > do
> > > > the
> > > > > > > > > errornours
> > > > > > > > > > > > > > >> roundtrip on the
> > > > > > > > > > > > > > >> new connection.
> > > > > > > > > > > > > > >> Its a great plus that the client can know when
> > the
> > > > > > error is
> > > > > > > > > > gonna
> > > > > > > > > > > > > > >> happen. instead of
> > > > > > > > > > > > > > >> the server to always have to report back if
> > > > something
> > > > > > > > changes
> > > > > > > > > > > > > > >> unexpectedly for the client
> > > > > > > > > > > > > > > You are assuming that the leader and the follower
> > > > agree
> > > > > > that
> > > > > > > > > the
> > > > > > > > > > > TCP
> > > > > > > > > > > > > > > session drops at the same time.  When there are
> > > > network
> > > > > > > > > problems,
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > may not be true.  The leader may still think the
> > > > > > previous TCP
> > > > > > > > > > > session
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > > active.  In that case, we have to keep the
> > > > incremental
> > > > > > fetch
> > > > > > > > > > > session
> > > > > > > > > > > > > > > state around until we learn otherwise (which
> > could be
> > > > > up
> > > > > > to
> > > > > > > > > that
> > > > > > > > > > 2
> > > > > > > > > > > > hour
> > > > > > > > > > > > > > > timeout I mentioned).  And if we get a new
> > incoming
> > > > > > > > incremental
> > > > > > > > > > > fetch
> > > > > > > > > > > > > > > request, we can't assume that it replaces the
> > > > previous
> > > > > > one,
> > > > > > > > > > because
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > IDs will be different (the new one starts a new
> > > > > session).
> > > > > > > > > > > > > > As mentioned, no reason to fear some time-outs out
> > of
> > > > our
> > > > > > > > control
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >>> Imagine that I made an argument that client
> > IDs are
> > > > > > > > "complex"
> > > > > > > > > > and
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > >>> be removed from our APIs.  After all, we can
> > just
> > > > > look
> > > > > > at
> > > > > > > > the
> > > > > > > > > > > > remote
> > > > > > > > > > > > > IP
> > > > > > > > > > > > > > >>> address and TCP port of each connection.
> > Would you
> > > > > > think
> > > > > > > > > that
> > > > > > > > > > > was
> > > > > > > > > > > > a
> > > > > > > > > > > > > > >>> good idea?  The client ID is useful when
> > looking at
> > > > > > logs.
> > > > > > > > > For
> > > > > > > > > > > > > example,
> > > > > > > > > > > > > > >>> if a rebalance is having problems, you want to
> > know
> > > > > > what
> > > > > > > > > > clients
> > > > > > > > > > > > were
> > > > > > > > > > > > > > >>> having a problem.  So having the client ID
> > field to
> > > > > > guide
> > > > > > > > you
> > > > > > > > > > is
> > > > > > > > > > > > > > >>> actually much less "complex" in practice than
> > not
> > > > > > having an
> > > > > > > > > ID.
> > > > > > > > > > > > > > >> I still cant follow why the correlation idea
> > will
> > > > not
> > > > > > help
> > > > > > > > > here.
> > > > > > > > > > > > > > >> Correlating logs with it usually works great.
> > Even
> > > > > with
> > > > > > > > > > primitive
> > > > > > > > > > > > > tools
> > > > > > > > > > > > > > >> like grep
> > > > > > > > > > > > > > > The correlation ID does help somewhat, but
> > certainly
> > > > > not
> > > > > > as
> > > > > > > > > much
> > > > > > > > > > > as a
> > > > > > > > > > > > > > > unique 64-bit ID.  The correlation ID is not
> > unique
> > > > in
> > > > > > the
> > > > > > > > > > broker,
> > > > > > > > > > > > just
> > > > > > > > > > > > > > > unique to a single NetworkClient.  Simiarly, the
> > > > > > correlation
> > > > > > > > ID
> > > > > > > > > > is
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > unique on the client side, if there are multiple
> > > > > > Consumers,
> > > > > > > > > etc.
> > > > > > > > > > > > > > Can always bump entropy in correlation IDs, never
> > had a
> > > > > > problem
> > > > > > > > > > > > > > of finding to many duplicates. Would be a
> > different KIP
> > > > > > though.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >>> Similarly, if metadata responses had epoch
> > numbers
> > > > > > (simple
> > > > > > > > > > > > > incrementing
> > > > > > > > > > > > > > >>> numbers), we would not have to debug problems
> > like
> > > > > > clients
> > > > > > > > > > > > > accidentally
> > > > > > > > > > > > > > >>> getting old metadata from servers that had been
> > > > > > partitioned
> > > > > > > > > off
> > > > > > > > > > > > from
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >>> network for a while.  Clients would know the
> > > > > difference
> > > > > > > > > between
> > > > > > > > > > > old
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > >>> new metadata.  So putting epochs in to the
> > metadata
> > > > > > request
> > > > > > > > > is
> > > > > > > > > > > much
> > > > > > > > > > > > > less
> > > > > > > > > > > > > > >>> "complex" operationally, even though it's an
> > extra
> > > > > > field in
> > > > > > > > > the
> > > > > > > > > > > > > request.
> > > > > > > > > > > > > > >>>    This has been discussed before on the
> > mailing
> > > > > list.
> > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > >>> So I think the bottom line for me is that
> > having
> > > > the
> > > > > > > > session
> > > > > > > > > ID
> > > > > > > > > > > and
> > > > > > > > > > > > > > >>> session epoch, while it adds two extra fields,
> > > > > reduces
> > > > > > > > > > > operational
> > > > > > > > > > > > > > >>> complexity and increases debuggability.  It
> > avoids
> > > > > > tightly
> > > > > > > > > > > coupling
> > > > > > > > > > > > > us
> > > > > > > > > > > > > > >>> to assumptions about reliable ordered delivery
> > > > which
> > > > > > tend
> > > > > > > > to
> > > > > > > > > be
> > > > > > > > > > > > > violated
> > > > > > > > > > > > > > >>> in practice in multiple layers of the stack.
> > > > > Finally,
> > > > > > it
> > > > > > > > > > avoids
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >>> necessity of refactoring NetworkClient.
> > > > > > > > > > > > > > >> So there is stacks out there that violate TCP
> > > > > > guarantees?
> > > > > > > > And
> > > > > > > > > > > > software
> > > > > > > > > > > > > > >> still works? How can this be? Can you elaborate
> > a
> > > > > little
> > > > > > > > where
> > > > > > > > > > > this
> > > > > > > > > > > > > > >> can be violated? I am not very familiar with
> > > > > virtualized
> > > > > > > > > > > > environments
> > > > > > > > > > > > > > >> but they can't really violate TCP contracts.
> > > > > > > > > > > > > > > TCP's guarantees of reliable, in-order
> > transmission
> > > > > > certainly
> > > > > > > > > can
> > > > > > > > > > > be
> > > > > > > > > > > > > > > violated.  For example, I once had to debug a
> > cluster
> > > > > > where a
> > > > > > > > > > > certain
> > > > > > > > > > > > > > > node had a network card which corrupted its
> > > > > transmissions
> > > > > > > > > > > > occasionally.
> > > > > > > > > > > > > > > With all the layers of checksums, you would think
> > > > that
> > > > > > this
> > > > > > > > was
> > > > > > > > > > not
> > > > > > > > > > > > > > > possible, but it happened.  We occasionally got
> > > > > corrupted
> > > > > > > > data
> > > > > > > > > > > > written
> > > > > > > > > > > > > > > to disk on the other end because of it.  Even
> > more
> > > > > > > > frustrating,
> > > > > > > > > > the
> > > > > > > > > > > > > data
> > > > > > > > > > > > > > > was not corrupted on disk on the sending node--
> > it
> > > > was
> > > > > a
> > > > > > bug
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > > network card driver that was injecting the
> > errors.
> > > > > > > > > > > > > > true, but your broker might aswell read a corrupted
> > > > 600GB
> > > > > > as
> > > > > > > > size
> > > > > > > > > > > from
> > > > > > > > > > > > > > the network and die with OOM instantly.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If you read 600 GB as the size from the network, you
> > will
> > > > > not
> > > > > > > > "die
> > > > > > > > > > with
> > > > > > > > > > > > > OOM instantly."  That would be a bug.  Instead, you
> > will
> > > > > > notice
> > > > > > > > > that
> > > > > > > > > > > 600
> > > > > > > > > > > > > GB is greater than max.message.bytes, and close the
> > > > > > connection.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Optimizing for still having functional
> > > > > > > > > > > > > > software under this circumstances is not
> > reasonable.
> > > > > > > > > > > > > > You want to get rid of such a
> > > > > > > > > > > > > > node ASAP and pray that zookeepers ticks get
> > corrupted
> > > > > > often
> > > > > > > > > enough
> > > > > > > > > > > > > > that it finally drops out of the cluster.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > There is a good reason that these kinda things
> > > > > > > > > > > > > > https://issues.apache.org/jira/browse/MESOS-4105
> > > > > > > > > > > > > > don't end up as kafka Jiras. In the end you can't
> > run
> > > > any
> > > > > > > > > software
> > > > > > > > > > in
> > > > > > > > > > > > > > these containers anymore. Application layer
> > checksums
> > > > > are a
> > > > > > > > neat
> > > > > > > > > > > thing
> > > > > > > > > > > > to
> > > > > > > > > > > > > > fail fast but trying to cope with this probably
> > causes
> > > > > > more bad
> > > > > > > > > > than
> > > > > > > > > > > > > > good.  So I would argue that we shouldn't try this
> > for
> > > > > the
> > > > > > > > fetch
> > > > > > > > > > > > > requests.
> > > > > > > > > > > > >
> > > > > > > > > > > > > One of the goals of Apache Kafka is to be "a
> > streaming
> > > > > > > > platform...
> > > > > > > > > > > > > [that] lets you store streams of records in a
> > > > > fault-tolerant
> > > > > > > > way."
> > > > > > > > > > For
> > > > > > > > > > > > > more information, see https://kafka.apache.org/intro
> > .
> > > > > > > > > > > Fault-tolerance
> > > > > > > > > > > > > is explicitly part of the goal of Kafka.  Prayer
> > should
> > > > be
> > > > > > > > > optional,
> > > > > > > > > > > not
> > > > > > > > > > > > > required, when running the software.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Crashing because someone sent you a bad packet is not
> > > > > > reasonable
> > > > > > > > > > > > > behavior.  It is a bug.  Similarly, bringing down the
> > > > whole
> > > > > > > > > cluster,
> > > > > > > > > > > > > which could a hundred nodes, because someone had a
> > bad
> > > > > > network
> > > > > > > > > > adapter
> > > > > > > > > > > > > is not reasonable behavior.  It is perhaps
> > reasonable for
> > > > > the
> > > > > > > > > cluster
> > > > > > > > > > > to
> > > > > > > > > > > > > perform worse when hardware is having problems.  But
> > > > > that's a
> > > > > > > > > > different
> > > > > > > > > > > > > discussion.
> > > > > > > > > > > > >
> > > > > > > > > > > > > best,
> > > > > > > > > > > > > Colin
> > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > However, my point was not about TCP's guarantees
> > > > being
> > > > > > > > > violated.
> > > > > > > > > > > My
> > > > > > > > > > > > > > > point is that TCP's guarantees are only one small
> > > > > > building
> > > > > > > > > block
> > > > > > > > > > to
> > > > > > > > > > > > > > > build a robust distributed system.  TCP basically
> > > > just
> > > > > > says
> > > > > > > > > that
> > > > > > > > > > if
> > > > > > > > > > > > you
> > > > > > > > > > > > > > > get any bytes from the stream, you will get the
> > ones
> > > > > that
> > > > > > > > were
> > > > > > > > > > sent
> > > > > > > > > > > > by
> > > > > > > > > > > > > > > the sender, in the order they were sent.  TCP
> > does
> > > > not
> > > > > > > > > guarantee
> > > > > > > > > > > that
> > > > > > > > > > > > > > > the bytes you send will get there.  It does not
> > > > > guarantee
> > > > > > > > that
> > > > > > > > > if
> > > > > > > > > > > you
> > > > > > > > > > > > > > > close the connection, the other end will know
> > about
> > > > it
> > > > > > in a
> > > > > > > > > > timely
> > > > > > > > > > > > > > > fashion.
> > > > > > > > > > > > > > These are very powerful grantees and since we use
> > TCP
> > > > we
> > > > > > should
> > > > > > > > > > > > > > piggy pack everything that is reasonable on to it.
> > IMO
> > > > > > there is
> > > > > > > > > no
> > > > > > > > > > > > > > need to reimplement correct sequencing again if
> > you get
> > > > > > that
> > > > > > > > from
> > > > > > > > > > > > > > your transport layer. It saves you the complexity,
> > it
> > > > > makes
> > > > > > > > > > > > > > you application behave way more naturally and your
> > api
> > > > > > easier
> > > > > > > > to
> > > > > > > > > > > > > > understand.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > There is literally nothing the Kernel wont let you
> > > > decide
> > > > > > > > > > > > > > especially not any timings. Only noticeable
> > exception
> > > > > being
> > > > > > > > > > TIME_WAIT
> > > > > > > > > > > > > > of usually 240 seconds but that already has little
> > todo
> > > > > > with
> > > > > > > > the
> > > > > > > > > > > broker
> > > > > > > > > > > > > > itself and
> > > > > > > > > > > > > > if we are running out of usable ports because of
> > this
> > > > > then
> > > > > > > > > expiring
> > > > > > > > > > > > > > fetch requests
> > > > > > > > > > > > > > wont help much anyways.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I hope I could strengthen the trust you have in
> > > > userland
> > > > > > TCP
> > > > > > > > > > > connection
> > > > > > > > > > > > > > management. It is really powerful and can be
> > exploited
> > > > > for
> > > > > > > > > maximum
> > > > > > > > > > > > gains
> > > > > > > > > > > > > > without much risk in my opinion.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It does not guarantee that the bytes will be
> > received
> > > > > in
> > > > > > a
> > > > > > > > > > > > > > > certain timeframe, and certainly doesn't
> > guarantee
> > > > that
> > > > > > if
> > > > > > > > you
> > > > > > > > > > > send a
> > > > > > > > > > > > > > > byte on connection X and then on connection Y,
> > that
> > > > the
> > > > > > > > remote
> > > > > > > > > > end
> > > > > > > > > > > > will
> > > > > > > > > > > > > > > read a byte on X before reading a byte on Y.
> > > > > > > > > > > > > > Noone expects this from two independent paths of
> > any
> > > > > kind.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > best,
> > > > > > > > > > > > > > > Colin
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> Hope this made my view clearer, especially the
> > first
> > > > > > part.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Best Jan
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>> best,
> > > > > > > > > > > > > > >>> Colin
> > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > >>>
> > > > > > > > > > > > > > >>>> If there is an error such as
> > NotLeaderForPartition
> > > > > is
> > > > > > > > > > > > > > >>>> returned for some partitions, the follower can
> > > > > always
> > > > > > > > send a
> > > > > > > > > > > full
> > > > > > > > > > > > > > >>>> FetchRequest. Is there a scenario that only
> > some
> > > > of
> > > > > > the
> > > > > > > > > > > partitions
> > > > > > > > > > > > > in a
> > > > > > > > > > > > > > >>>> FetchResponse is lost?
> > > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > > >>>> Thanks,
> > > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > > >>>> Jiangjie (Becket) Qin
> > > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > > >>>> On Sat, Dec 2, 2017 at 2:37 PM, Colin McCabe<
> > > > > > > > > > cmcc...@apache.org
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >>>>
> > > > > > > > > > > > > > >>>>> On Fri, Dec 1, 2017, at 11:46, Dong Lin
> > wrote:
> > > > > > > > > > > > > > >>>>>> On Thu, Nov 30, 2017 at 9:37 AM, Colin
> > McCabe<
> > > > > > > > > > > > cmcc...@apache.org>
> > > > > > > > > > > > > > >>>>> wrote:
> > > > > > > > > > > > > > >>>>>>> On Wed, Nov 29, 2017, at 18:59, Dong Lin
> > wrote:
> > > > > > > > > > > > > > >>>>>>>> Hey Colin,
> > > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > > >>>>>>>> Thanks much for the update. I have a few
> > > > > questions
> > > > > > > > > below:
> > > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > > >>>>>>>> 1. I am not very sure that we need Fetch
> > > > Session
> > > > > > > > Epoch.
> > > > > > > > > It
> > > > > > > > > > > > > seems that
> > > > > > > > > > > > > > >>>>>>>> Fetch
> > > > > > > > > > > > > > >>>>>>>> Session Epoch is only needed to help
> > leader
> > > > > > > > distinguish
> > > > > > > > > > > > between
> > > > > > > > > > > > > "a
> > > > > > > > > > > > > > >>>>> full
> > > > > > > > > > > > > > >>>>>>>> fetch request" and "a full fetch request
> > and
> > > > > > request a
> > > > > > > > > new
> > > > > > > > > > > > > > >>>>> incremental
> > > > > > > > > > > > > > >>>>>>>> fetch session". Alternatively, follower
> > can
> > > > also
> > > > > > > > > indicate
> > > > > > > > > > "a
> > > > > > > > > > > > > full
> > > > > > > > > > > > > > >>>>> fetch
> > > > > > > > > > > > > > >>>>>>>> request and request a new incremental
> > fetch
> > > > > > session"
> > > > > > > > by
> > > > > > > > > > > > setting
> > > > > > > > > > > > > Fetch
> > > > > > > > > > > > > > >>>>>>>> Session ID to -1 without using Fetch
> > Session
> > > > > > Epoch.
> > > > > > > > Does
> > > > > > > > > > > this
> > > > > > > > > > > > > make
> > > > > > > > > > > > > > >>>>> sense?
> > > > > > > > > > > > > > >>>>>>> Hi Dong,
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>> The fetch session epoch is very important
> > for
> > > > > > ensuring
> > > > > > > > > > > > > correctness.  It
> > > > > > > > > > > > > > >>>>>>> prevents corrupted or incomplete fetch
> > data due
> > > > > to
> > > > > > > > > network
> > > > > > > > > > > > > reordering
> > > > > > > > > > > > > > >>>>> or
> > > > > > > > > > > > > > >>>>>>> loss.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>> For example, consider a scenario where the
> > > > > follower
> > > > > > > > > sends a
> > > > > > > > > > > > fetch
> > > > > > > > > > > > > > >>>>>>> request to the leader.  The leader
> > responds,
> > > > but
> > > > > > the
> > > > > > > > > > response
> > > > > > > > > > > > is
> > > > > > > > > > > > > lost
> > > > > > > > > > > > > > >>>>>>> because of network problems which affected
> > the
> > > > > TCP
> > > > > > > > > session.
> > > > > > > > > > > In
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > >>>>>>> case, the follower must establish a new TCP
> > > > > > session and
> > > > > > > > > > > re-send
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >>>>>>> incremental fetch request.  But the leader
> > does
> > > > > not
> > > > > > > > know
> > > > > > > > > > that
> > > > > > > > > > > > the
> > > > > > > > > > > > > > >>>>>>> follower didn't receive the previous
> > > > incremental
> > > > > > fetch
> > > > > > > > > > > > > response.  It is
> > > > > > > > > > > > > > >>>>>>> only the incremental fetch epoch which
> > lets the
> > > > > > leader
> > > > > > > > > know
> > > > > > > > > > > > that
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > >>>>>>> needs to resend that data, and not data
> > which
> > > > > comes
> > > > > > > > > > > afterwards.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>> You could construct similar scenarios with
> > > > > message
> > > > > > > > > > > reordering,
> > > > > > > > > > > > > > >>>>>>> duplication, etc.  Basically, this is a
> > > > stateful
> > > > > > > > protocol
> > > > > > > > > > on
> > > > > > > > > > > an
> > > > > > > > > > > > > > >>>>>>> unreliable network, and you need to know
> > > > whether
> > > > > > the
> > > > > > > > > > follower
> > > > > > > > > > > > > got the
> > > > > > > > > > > > > > >>>>>>> previous data you sent before you move
> > on.  And
> > > > > you
> > > > > > > > need
> > > > > > > > > to
> > > > > > > > > > > > > handle
> > > > > > > > > > > > > > >>>>>>> issues like duplicated or delayed requests.
> > > > > These
> > > > > > > > issues
> > > > > > > > > > do
> > > > > > > > > > > > not
> > > > > > > > > > > > > affect
> > > > > > > > > > > > > > >>>>>>> the full fetch request, because it is not
> > > > > > stateful--
> > > > > > > > any
> > > > > > > > > > full
> > > > > > > > > > > > > fetch
> > > > > > > > > > > > > > >>>>>>> request can be understood and properly
> > > > responded
> > > > > > to in
> > > > > > > > > > > > isolation.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>> Thanks for the explanation. This makes
> > sense. On
> > > > > the
> > > > > > > > other
> > > > > > > > > > > hand
> > > > > > > > > > > > I
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > >>>>>> be interested in learning more about whether
> > > > > > Becket's
> > > > > > > > > > solution
> > > > > > > > > > > > > can help
> > > > > > > > > > > > > > >>>>>> simplify the protocol by not having the echo
> > > > field
> > > > > > and
> > > > > > > > > > whether
> > > > > > > > > > > > > that is
> > > > > > > > > > > > > > >>>>>> worth doing.
> > > > > > > > > > > > > > >>>>> Hi Dong,
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>> I commented about this in the other thread.
> > A
> > > > > > solution
> > > > > > > > > which
> > > > > > > > > > > > > doesn't
> > > > > > > > > > > > > > >>>>> maintain session information doesn't work
> > here.
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>>>>> 2. It is said that Incremental
> > FetchRequest
> > > > will
> > > > > > > > include
> > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > >>>>> whose
> > > > > > > > > > > > > > >>>>>>>> fetch offset or maximum number of fetch
> > bytes
> > > > > has
> > > > > > been
> > > > > > > > > > > > changed.
> > > > > > > > > > > > > If
> > > > > > > > > > > > > > >>>>>>>> follower's logStartOffet of a partition
> > has
> > > > > > changed,
> > > > > > > > > > should
> > > > > > > > > > > > this
> > > > > > > > > > > > > > >>>>>>>> partition also be included in the next
> > > > > > FetchRequest to
> > > > > > > > > the
> > > > > > > > > > > > > leader?
> > > > > > > > > > > > > > >>>>>>> Otherwise, it
> > > > > > > > > > > > > > >>>>>>>> may affect the handling of
> > > > DeleteRecordsRequest
> > > > > > > > because
> > > > > > > > > > > leader
> > > > > > > > > > > > > may
> > > > > > > > > > > > > > >>>>> not
> > > > > > > > > > > > > > >>>>>>> know
> > > > > > > > > > > > > > >>>>>>>> the corresponding data has been deleted
> > on the
> > > > > > > > follower.
> > > > > > > > > > > > > > >>>>>>> Yeah, the follower should include the
> > partition
> > > > > if
> > > > > > the
> > > > > > > > > > > > > logStartOffset
> > > > > > > > > > > > > > >>>>>>> has changed.  That should be spelled out
> > on the
> > > > > > KIP.
> > > > > > > > > > Fixed.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>>> 3. In the section "Per-Partition Data", a
> > > > > > partition is
> > > > > > > > > not
> > > > > > > > > > > > > considered
> > > > > > > > > > > > > > >>>>>>>> dirty if its log start offset has changed.
> > > > Later
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > section
> > > > > > > > > > > > > > >>>>>>> "FetchRequest
> > > > > > > > > > > > > > >>>>>>>> Changes", it is said that incremental
> > fetch
> > > > > > responses
> > > > > > > > > will
> > > > > > > > > > > > > include a
> > > > > > > > > > > > > > >>>>>>>> partition if its logStartOffset has
> > changed.
> > > > It
> > > > > > seems
> > > > > > > > > > > > > inconsistent.
> > > > > > > > > > > > > > >>>>> Can
> > > > > > > > > > > > > > >>>>>>>> you update the KIP to clarify it?
> > > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > > >>>>>>> In the "Per-Partition Data" section, it
> > does
> > > > say
> > > > > > that
> > > > > > > > > > > > > logStartOffset
> > > > > > > > > > > > > > >>>>>>> changes make a partition dirty, though,
> > right?
> > > > > The
> > > > > > > > first
> > > > > > > > > > > > bullet
> > > > > > > > > > > > > point
> > > > > > > > > > > > > > >>>>>>> is:
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>>> * The LogCleaner deletes messages, and
> > this
> > > > > > changes
> > > > > > > > the
> > > > > > > > > > log
> > > > > > > > > > > > > start
> > > > > > > > > > > > > > >>>>> offset
> > > > > > > > > > > > > > >>>>>>> of the partition on the leader., or
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>> Ah I see. I think I didn't notice this
> > because
> > > > > > statement
> > > > > > > > > > > assumes
> > > > > > > > > > > > > that the
> > > > > > > > > > > > > > >>>>>> LogStartOffset in the leader only changes
> > due to
> > > > > > > > > LogCleaner.
> > > > > > > > > > > In
> > > > > > > > > > > > > fact the
> > > > > > > > > > > > > > >>>>>> LogStartOffset can change on the leader due
> > to
> > > > > > either
> > > > > > > > log
> > > > > > > > > > > > > retention and
> > > > > > > > > > > > > > >>>>>> DeleteRecordsRequest. I haven't verified
> > whether
> > > > > > > > > LogCleaner
> > > > > > > > > > > can
> > > > > > > > > > > > > change
> > > > > > > > > > > > > > >>>>>> LogStartOffset though. It may be a bit
> > better to
> > > > > > just
> > > > > > > > say
> > > > > > > > > > > that a
> > > > > > > > > > > > > > >>>>>> partition is considered dirty if
> > LogStartOffset
> > > > > > changes.
> > > > > > > > > > > > > > >>>>> I agree.  It should be straightforward to
> > just
> > > > > > resend the
> > > > > > > > > > > > > partition if
> > > > > > > > > > > > > > >>>>> logStartOffset changes.
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>>>>> 4. In "Fetch Session Caching" section, it
> > is
> > > > > said
> > > > > > that
> > > > > > > > > > each
> > > > > > > > > > > > > broker
> > > > > > > > > > > > > > >>>>> has a
> > > > > > > > > > > > > > >>>>>>>> limited number of slots. How is this
> > number
> > > > > > > > determined?
> > > > > > > > > > Does
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > >>>>> require
> > > > > > > > > > > > > > >>>>>>>> a new broker config for this number?
> > > > > > > > > > > > > > >>>>>>> Good point.  I added two broker
> > configuration
> > > > > > > > parameters
> > > > > > > > > to
> > > > > > > > > > > > > control
> > > > > > > > > > > > > > >>>>> this
> > > > > > > > > > > > > > >>>>>>> number.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>> I am curious to see whether we can avoid
> > some of
> > > > > > these
> > > > > > > > new
> > > > > > > > > > > > > configs. For
> > > > > > > > > > > > > > >>>>>> example, incremental.fetch.session.
> > > > > > > > cache.slots.per.broker
> > > > > > > > > > is
> > > > > > > > > > > > > probably
> > > > > > > > > > > > > > >>>>> not
> > > > > > > > > > > > > > >>>>>> necessary because if a leader knows that a
> > > > > > FetchRequest
> > > > > > > > > > comes
> > > > > > > > > > > > > from a
> > > > > > > > > > > > > > >>>>>> follower, we probably want the leader to
> > always
> > > > > > cache
> > > > > > > > the
> > > > > > > > > > > > > information
> > > > > > > > > > > > > > >>>>>> from that follower. Does this make sense?
> > > > > > > > > > > > > > >>>>> Yeah, maybe we can avoid having
> > > > > > > > > > > > > > >>>>> incremental.fetch.session.
> > > > cache.slots.per.broker.
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>>> Maybe we can discuss the config later after
> > > > there
> > > > > is
> > > > > > > > > > agreement
> > > > > > > > > > > > on
> > > > > > > > > > > > > how the
> > > > > > > > > > > > > > >>>>>> protocol would look like.
> > > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > > >>>>>>>> What is the error code if broker does
> > > > > > > > > > > > > > >>>>>>>> not have new log for the incoming
> > > > FetchRequest?
> > > > > > > > > > > > > > >>>>>>> Hmm, is there a typo in this question?
> > Maybe
> > > > you
> > > > > > meant
> > > > > > > > > to
> > > > > > > > > > > ask
> > > > > > > > > > > > > what
> > > > > > > > > > > > > > >>>>>>> happens if there is no new cache slot for
> > the
> > > > > > incoming
> > > > > > > > > > > > > FetchRequest?
> > > > > > > > > > > > > > >>>>>>> That's not an error-- the incremental fetch
> > > > > > session ID
> > > > > > > > > just
> > > > > > > > > > > > gets
> > > > > > > > > > > > > set to
> > > > > > > > > > > > > > >>>>>>> 0, indicating no incremental fetch session
> > was
> > > > > > created.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>> Yeah there is a typo. You have answered my
> > > > > question.
> > > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > > >>>>>>
> > > > > > > > > > > > > > >>>>>>>> 5. Can you clarify what happens if
> > follower
> > > > > adds a
> > > > > > > > > > partition
> > > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > >>>>>>>> ReplicaFetcherThread after receiving
> > > > > > > > > LeaderAndIsrRequest?
> > > > > > > > > > > Does
> > > > > > > > > > > > > leader
> > > > > > > > > > > > > > >>>>>>>> needs to generate a new session for this
> > > > > > > > > > > ReplicaFetcherThread
> > > > > > > > > > > > or
> > > > > > > > > > > > > > >>>>> does it
> > > > > > > > > > > > > > >>>>>>> re-use
> > > > > > > > > > > > > > >>>>>>>> the existing session?  If it uses a new
> > > > session,
> > > > > > is
> > > > > > > > the
> > > > > > > > > > old
> > > > > > > > > > > > > session
> > > > > > > > > > > > > > >>>>>>>> actively deleted from the slot?
> > > > > > > > > > > > > > >>>>>>> The basic idea is that you can't make
> > changes,
> > > > > > except
> > > > > > > > by
> > > > > > > > > > > > sending
> > > > > > > > > > > > > a full
> > > > > > > > > > > > > > >>>>>>> fetch request.  However, perhaps we can
> > allow
> > > > the
> > > > > > > > client
> > > > > > > > > to
> > > > > > > > > > > > > re-use its
> > > > > > > > > > > > > > >>>>>>> existing session ID.  If the client sets
> > > > > sessionId
> > > > > > =
> > > > > > > > id,
> > > > > > > > > > > epoch
> > > > > > > > > > > > =
> > > > > > > > > > > > > 0, it
> > > > > > > > > > > > > > >>>>>>> could re-initialize the session.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>> Yeah I agree with the basic idea. We
> > probably
> > > > want
> > > > > > to
> > > > > > > > > > > understand
> > > > > > > > > > > > > more
> > > > > > > > > > > > > > >>>>>> detail about how this works later.
> > > > > > > > > > > > > > >>>>> Sounds good.  I updated the KIP with this
> > > > > > information.  A
> > > > > > > > > > > > > > >>>>> re-initialization should be exactly the same
> > as
> > > > an
> > > > > > > > > > > > initialization,
> > > > > > > > > > > > > > >>>>> except that it reuses an existing ID.
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>> best,
> > > > > > > > > > > > > > >>>>> Colin
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>>
> > > > > > > > > > > > > > >>>>>>>> BTW, I think it may be useful if the KIP
> > can
> > > > > > include
> > > > > > > > the
> > > > > > > > > > > > example
> > > > > > > > > > > > > > >>>>> workflow
> > > > > > > > > > > > > > >>>>>>>> of how this feature will be used in case
> > of
> > > > > > partition
> > > > > > > > > > change
> > > > > > > > > > > > > and so
> > > > > > > > > > > > > > >>>>> on.
> > > > > > > > > > > > > > >>>>>>> Yeah, that might help.
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>> best,
> > > > > > > > > > > > > > >>>>>>> Colin
> > > > > > > > > > > > > > >>>>>>>
> > > > > > > > > > > > > > >>>>>>>> Thanks,
> > > > > > > > > > > > > > >>>>>>>> Dong
> > > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > > >>>>>>>> On Wed, Nov 29, 2017 at 12:13 PM, Colin
> > > > McCabe<
> > > > > > > > > > > > > cmcc...@apache.org>
> > > > > > > > > > > > > > >>>>>>>> wrote:
> > > > > > > > > > > > > > >>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>> I updated the KIP with the ideas we've
> > been
> > > > > > > > discussing.
> > > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>> best,
> > > > > > > > > > > > > > >>>>>>>>> Colin
> > > > > > > > > > > > > > >>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>> On Tue, Nov 28, 2017, at 08:38, Colin
> > McCabe
> > > > > > wrote:
> > > > > > > > > > > > > > >>>>>>>>>> On Mon, Nov 27, 2017, at 22:30, Jan
> > Filipiak
> > > > > > wrote:
> > > > > > > > > > > > > > >>>>>>>>>>> Hi Colin, thank you  for this KIP, it
> > can
> > > > > > become a
> > > > > > > > > > really
> > > > > > > > > > > > > > >>>>> useful
> > > > > > > > > > > > > > >>>>>>> thing.
> > > > > > > > > > > > > > >>>>>>>>>>> I just scanned through the discussion
> > so
> > > > far
> > > > > > and
> > > > > > > > > wanted
> > > > > > > > > > > to
> > > > > > > > > > > > > > >>>>> start a
> > > > > > > > > > > > > > >>>>>>>>>>> thread to make as decision about
> > keeping
> > > > the
> > > > > > > > > > > > > > >>>>>>>>>>> cache with the Connection / Session or
> > > > having
> > > > > > some
> > > > > > > > > sort
> > > > > > > > > > > of
> > > > > > > > > > > > > UUID
> > > > > > > > > > > > > > >>>>>>> indN
> > > > > > > > > > > > > > >>>>>>>>> exed
> > > > > > > > > > > > > > >>>>>>>>>>> global Map.
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>> Sorry if that has been settled already
> > and
> > > > I
> > > > > > missed
> > > > > > > > > it.
> > > > > > > > > > > In
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > >>>>>>> case
> > > > > > > > > > > > > > >>>>>>>>>>> could anyone point me to the
> > discussion?
> > > > > > > > > > > > > > >>>>>>>>>> Hi Jan,
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>> I don't think anyone has discussed the
> > idea
> > > > of
> > > > > > tying
> > > > > > > > > the
> > > > > > > > > > > > cache
> > > > > > > > > > > > > > >>>>> to an
> > > > > > > > > > > > > > >>>>>>>>>> individual TCP session yet.  I agree
> > that
> > > > > since
> > > > > > the
> > > > > > > > > > cache
> > > > > > > > > > > is
> > > > > > > > > > > > > > >>>>>>> intended to
> > > > > > > > > > > > > > >>>>>>>>>> be used only by a single follower or
> > client,
> > > > > > it's an
> > > > > > > > > > > > > interesting
> > > > > > > > > > > > > > >>>>>>> thing
> > > > > > > > > > > > > > >>>>>>>>>> to think about.
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>> I guess the obvious disadvantage is that
> > > > > > whenever
> > > > > > > > your
> > > > > > > > > > TCP
> > > > > > > > > > > > > > >>>>> session
> > > > > > > > > > > > > > >>>>>>>>>> drops, you have to make a full fetch
> > request
> > > > > > rather
> > > > > > > > > than
> > > > > > > > > > > an
> > > > > > > > > > > > > > >>>>>>> incremental
> > > > > > > > > > > > > > >>>>>>>>>> one.  It's not clear to me how often
> > this
> > > > > > happens in
> > > > > > > > > > > > practice
> > > > > > > > > > > > > --
> > > > > > > > > > > > > > >>>>> it
> > > > > > > > > > > > > > >>>>>>>>>> probably depends a lot on the quality
> > of the
> > > > > > > > network.
> > > > > > > > > > > From
> > > > > > > > > > > > a
> > > > > > > > > > > > > > >>>>> code
> > > > > > > > > > > > > > >>>>>>>>>> perspective, it might also be a bit
> > > > difficult
> > > > > to
> > > > > > > > > access
> > > > > > > > > > > data
> > > > > > > > > > > > > > >>>>>>> associated
> > > > > > > > > > > > > > >>>>>>>>>> with the Session from classes like
> > KafkaApis
> > > > > > > > (although
> > > > > > > > > > we
> > > > > > > > > > > > > could
> > > > > > > > > > > > > > >>>>>>> refactor
> > > > > > > > > > > > > > >>>>>>>>>> it to make this easier).
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>> It's also clear that even if we tie the
> > > > cache
> > > > > > to the
> > > > > > > > > > > > session,
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > >>>>>>> still
> > > > > > > > > > > > > > >>>>>>>>>> have to have limits on the number of
> > caches
> > > > > > we're
> > > > > > > > > > willing
> > > > > > > > > > > to
> > > > > > > > > > > > > > >>>>> create.
> > > > > > > > > > > > > > >>>>>>>>>> And probably we should reserve some
> > cache
> > > > > slots
> > > > > > for
> > > > > > > > > each
> > > > > > > > > > > > > > >>>>> follower, so
> > > > > > > > > > > > > > >>>>>>>>>> that clients don't take all of them.
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>> Id rather see a protocol in which the
> > > > client
> > > > > is
> > > > > > > > > hinting
> > > > > > > > > > > the
> > > > > > > > > > > > > > >>>>> broker
> > > > > > > > > > > > > > >>>>>>>>> that,
> > > > > > > > > > > > > > >>>>>>>>>>> he is going to use the feature instead
> > of a
> > > > > > client
> > > > > > > > > > > > > > >>>>>>>>>>> realizing that the broker just offered
> > the
> > > > > > feature
> > > > > > > > > > > > > (regardless
> > > > > > > > > > > > > > >>>>> of
> > > > > > > > > > > > > > >>>>>>>>>>> protocol version which should only
> > indicate
> > > > > > that
> > > > > > > > the
> > > > > > > > > > > > feature
> > > > > > > > > > > > > > >>>>>>>>>>> would be usable).
> > > > > > > > > > > > > > >>>>>>>>>> Hmm.  I'm not sure what you mean by
> > > > "hinting."
> > > > > > I do
> > > > > > > > > > think
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > >>>>> the
> > > > > > > > > > > > > > >>>>>>>>>> server should have the option of not
> > > > accepting
> > > > > > > > > > incremental
> > > > > > > > > > > > > > >>>>> requests
> > > > > > > > > > > > > > >>>>>>> from
> > > > > > > > > > > > > > >>>>>>>>>> specific clients, in order to save
> > memory
> > > > > space.
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>> This seems to work better with a per
> > > > > > > > > > > > > > >>>>>>>>>>> connection/session attached Metadata
> > than
> > > > > with
> > > > > > a
> > > > > > > > Map
> > > > > > > > > > and
> > > > > > > > > > > > > could
> > > > > > > > > > > > > > >>>>>>> allow
> > > > > > > > > > > > > > >>>>>>>>> for
> > > > > > > > > > > > > > >>>>>>>>>>> easier client implementations.
> > > > > > > > > > > > > > >>>>>>>>>>> It would also make Client-side code
> > easier
> > > > as
> > > > > > there
> > > > > > > > > > > > wouldn't
> > > > > > > > > > > > > > >>>>> be any
> > > > > > > > > > > > > > >>>>>>>>>>> Cache-miss error Messages to handle.
> > > > > > > > > > > > > > >>>>>>>>>> It is nice not to have to handle
> > cache-miss
> > > > > > > > > responses, I
> > > > > > > > > > > > > agree.
> > > > > > > > > > > > > > >>>>>>>>>> However, TCP sessions aren't exposed to
> > most
> > > > > of
> > > > > > our
> > > > > > > > > > > > > client-side
> > > > > > > > > > > > > > >>>>> code.
> > > > > > > > > > > > > > >>>>>>>>>> For example, when the Producer creates a
> > > > > > message and
> > > > > > > > > > hands
> > > > > > > > > > > > it
> > > > > > > > > > > > > > >>>>> off to
> > > > > > > > > > > > > > >>>>>>> the
> > > > > > > > > > > > > > >>>>>>>>>> NetworkClient, the NC will transparently
> > > > > > re-connect
> > > > > > > > > and
> > > > > > > > > > > > > re-send a
> > > > > > > > > > > > > > >>>>>>>>>> message if the first send failed.  The
> > > > > > higher-level
> > > > > > > > > code
> > > > > > > > > > > > will
> > > > > > > > > > > > > > >>>>> not be
> > > > > > > > > > > > > > >>>>>>>>>> informed about whether the TCP session
> > was
> > > > > > > > > > re-established,
> > > > > > > > > > > > > > >>>>> whether an
> > > > > > > > > > > > > > >>>>>>>>>> existing TCP session was used, and so
> > on.
> > > > So
> > > > > > > > overall
> > > > > > > > > I
> > > > > > > > > > > > would
> > > > > > > > > > > > > > >>>>> still
> > > > > > > > > > > > > > >>>>>>> lean
> > > > > > > > > > > > > > >>>>>>>>>> towards not coupling this to the TCP
> > > > > session...
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>> best,
> > > > > > > > > > > > > > >>>>>>>>>> Colin
> > > > > > > > > > > > > > >>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>     Thank you again for the KIP. And
> > again,
> > > > > if
> > > > > > this
> > > > > > > > > was
> > > > > > > > > > > > > clarified
> > > > > > > > > > > > > > >>>>>>> already
> > > > > > > > > > > > > > >>>>>>>>>>> please drop me a hint where I could
> > read
> > > > > about
> > > > > > it.
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>> Best Jan
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>> On 21.11.2017 22:02, Colin McCabe
> > wrote:
> > > > > > > > > > > > > > >>>>>>>>>>>> Hi all,
> > > > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>> I created a KIP to improve the
> > scalability
> > > > > and
> > > > > > > > > latency
> > > > > > > > > > > of
> > > > > > > > > > > > > > >>>>>>>>> FetchRequest:
> > > > > > > > > > > > > > >>>>>>>>>>>> https://cwiki.apache.org/
> > > > > > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > > > > > >>>>>>>>> 227%3A+Introduce+Incremental+
> > > > > > > > > FetchRequests+to+Increase+
> > > > > > > > > > > > > > >>>>>>>>> Partition+Scalability
> > > > > > > > > > > > > > >>>>>>>>>>>> Please take a look.
> > > > > > > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > > > > > > >>>>>>>>>>>> cheers,
> > > > > > > > > > > > > > >>>>>>>>>>>> Colin
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> >

Reply via email to