Hey Jun,

I see. Sounds good. Yeah it is probably simpler to leave this to another
KIP in the future.

Thanks for all the comments. Since there is no further comment in the
community, I will open the voting thread.

Thanks,
Dong

On Mon, Dec 11, 2017 at 5:37 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> The case that I am thinking is network partitioning. Suppose one deploys a
> stretched cluster across multiple AZs in the same region. If the machines
> in one AZ can't communicate to brokers in other AZs due to a network issue,
> the brokers in that AZ won't get any new metadata.
>
> We can potentially solve this problem by requiring some kind of regular
> heartbeats between the controller and the broker. This may need some more
> thoughts. So, it's probably fine to leave this to another KIP in the
> future.
>
> Thanks,
>
> Jun
>
> On Mon, Dec 11, 2017 at 2:55 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks for the comment. I am open to improve this KIP to address more
> > problems. I probably need more help in understanding what is the current
> > problem with consumer using outdated metadata and whether it is easier to
> > address it together with this KIP.
> >
> > I agree that a consumer can potentially talk to old leader for a long
> time
> > even after this KIP. But after this KIP, the consumer probably should not
> > get OffetOutofRangeException and therefore will not cause offset rewind
> > issue. So the only problem is that consumer will not be able to fetch
> data
> > until it has updated metadata. It seems that this situation can only
> happen
> > if the broker is too slow in processing LeaderAndIsrRequest since
> otherwise
> > the consumer will be forced to update metadata due to
> > NotLeaderForPartitionException. So the problem we are having here is
> that
> > consumer will not be able to fetch data if some broker is too slow in
> > processing LeaderAndIsrRequest.
> >
> > Because Kafka propagates LeaderAndIsrRequest asynchronously to all
> brokers
> > in the cluster, there will always be a period of time when consumer can
> not
> > fetch data for the partition during the leadership change. Thus it seems
> > more like a broker-side performance issue instead of client-side
> > correctness issue. My gut feel is that it is not causing a much a problem
> > as the problem to be fixed in this KIP. And if we were to address it, we
> > probably need to make change in the broker side, e.g. with prioritized
> > queue for controller-related requests, which may be kind of orthogonal to
> > this KIP. I am not very sure it will be easier to address it with the
> > change in this KIP. Do you have any recommendation?
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the reply.
> > >
> > > My suggestion of forcing the metadata refresh from the controller may
> not
> > > work in general since the cached controller could be outdated too. The
> > > general problem is that if a consumer's metadata is outdated, it may
> get
> > > stuck with the old leader for a long time. We can address the issue of
> > > detecting outdated metadata in a separate KIP in the future if you
> didn't
> > > intend to address it in this KIP.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks much for your comments. Given that client needs to
> de-serialize
> > > the
> > > > metadata anyway, the extra overhead of checking the per-partition
> > version
> > > > for every partition should not be a big concern. Thus it makes sense
> to
> > > use
> > > > leader epoch as the per-partition version instead of creating a
> global
> > > > metadata version. I will update the KIP to do that.
> > > >
> > > > Regarding the detection of outdated metadata, I think it is possible
> to
> > > > ensure that client gets latest metadata by fetching from controller.
> > Note
> > > > that this requires extra logic in the controller such that controller
> > > > updates metadata directly in memory without requiring
> > > > UpdateMetadataRequest. But I am not sure the main motivation of this
> at
> > > > this moment. But this makes controller more like a bottleneck in the
> > > > cluster which we probably want to avoid.
> > > >
> > > > I think we can probably keep the current way of ensuring metadata
> > > > freshness. Currently client will be forced to refresh metadata if
> > broker
> > > > returns error (e.g. NotLeaderForPartition) due to outdated metadata
> or
> > if
> > > > the metadata does not contain the partition that the client needs. In
> > the
> > > > future, as you previously suggested, we can include per-partition
> > > > leaderEpoch in the FetchRequest/ProduceRequest such that broker can
> > > return
> > > > error if the epoch is smaller than cached epoch in the broker. Given
> > that
> > > > this adds more complexity to Kafka, I think we can probably think
> about
> > > > that leader when we have a specific use-case or problem to solve with
> > > > up-to-date metadata. Does this sound OK?
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Thanks for the reply. A few more points below.
> > > > >
> > > > > For dealing with how to prevent a consumer switching from a new
> > leader
> > > to
> > > > > an old leader, you suggestion that refreshes metadata on consumer
> > > restart
> > > > > until it sees a metadata version >= the one associated with the
> > offset
> > > > > works too, as long as we guarantee that the cached metadata
> versions
> > on
> > > > the
> > > > > brokers only go up.
> > > > >
> > > > > The second discussion point is on whether the metadata versioning
> > > should
> > > > be
> > > > > per partition or global. For the partition level versioning, you
> were
> > > > > concerned about the performance. Given that metadata updates are
> > rare,
> > > I
> > > > am
> > > > > not sure if it's a big concern though. Doing a million if tests is
> > > > probably
> > > > > going to take less than 1ms. Another thing is that the metadata
> > version
> > > > > seems to need to survive controller failover. In your current
> > > approach, a
> > > > > consumer may not be able to wait on the right version of the
> metadata
> > > > after
> > > > > the consumer restart since the metadata version may have been
> > recycled
> > > on
> > > > > the server side due to a controller failover while the consumer is
> > > down.
> > > > > The partition level leaderEpoch survives controller failure and
> won't
> > > > have
> > > > > this issue.
> > > > >
> > > > > Lastly, neither your proposal nor mine addresses the issue how to
> > > > guarantee
> > > > > a consumer to detect that is metadata is outdated. Currently, the
> > > > consumer
> > > > > is not guaranteed to fetch metadata from every broker within some
> > > bounded
> > > > > period of time. Maybe this is out of the scope of your KIP. But one
> > > idea
> > > > is
> > > > > force the consumer to refresh metadata from the controller
> > > periodically.
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Jun,
> > > > > >
> > > > > > Thanks much for the comments. Great point particularly regarding
> > > (3). I
> > > > > > haven't thought about this before.
> > > > > >
> > > > > > It seems that there are two possible ways where the version
> number
> > > can
> > > > be
> > > > > > used. One solution is for client to check the version number at
> the
> > > > time
> > > > > it
> > > > > > receives MetadataResponse. And if the version number in the
> > > > > > MetadataResponse is smaller than the version number in the
> client's
> > > > > cache,
> > > > > > the client will be forced to fetch metadata again.  Another
> > solution,
> > > > as
> > > > > > you have suggested, is for broker to check the version number at
> > the
> > > > time
> > > > > > it receives a request from client. The broker will reject the
> > request
> > > > if
> > > > > > the version is smaller than the version in broker's cache.
> > > > > >
> > > > > > I am not very sure that the second solution can address the
> problem
> > > > here.
> > > > > > In the scenario described in the JIRA ticket, broker's cache may
> be
> > > > > > outdated because it has not processed the LeaderAndIsrRequest
> from
> > > the
> > > > > > controller. Thus it may still process client's request even if
> the
> > > > > version
> > > > > > in client's request is actually outdated. Does this make sense?
> > > > > >
> > > > > > IMO, it seems that we can address problem (3) by saving the
> > metadata
> > > > > > version together with the offset. After consumer starts, it will
> > keep
> > > > > > fetching metadata until the metadata version >= the version saved
> > > with
> > > > > the
> > > > > > offset of this partition.
> > > > > >
> > > > > > Regarding problems (1) and (2): Currently we use the version
> number
> > > in
> > > > > the
> > > > > > MetadataResponse to ensure that the metadata does not go back in
> > > time.
> > > > > > There are two alternative solutions to address problems (1) and
> > (2).
> > > > One
> > > > > > solution is for client to enumerate all partitions in the
> > > > > MetadataResponse,
> > > > > > compare their epoch with those in the cached metadata, and
> rejects
> > > the
> > > > > > MetadataResponse iff any leader epoch is smaller. The main
> concern
> > is
> > > > > that
> > > > > > MetadataResponse currently cached information of all partitions
> in
> > > the
> > > > > > entire cluster. It may slow down client's performance if we were
> to
> > > do
> > > > > it.
> > > > > > The other solution is for client to enumerate partitions for only
> > > > topics
> > > > > > registered in the org.apache.kafka.clients.Metadata, which will
> be
> > > an
> > > > > > empty
> > > > > > set for producer and the set of subscribed partitions for
> consumer.
> > > But
> > > > > > this degrades to all topics if consumer subscribes to topics in
> the
> > > > > cluster
> > > > > > by pattern.
> > > > > >
> > > > > > Note that client will only be forced to update metadata if the
> > > version
> > > > in
> > > > > > the MetadataResponse is smaller than the version in the cached
> > > > metadata.
> > > > > In
> > > > > > general it should not be a problem. It can be a problem only if
> > some
> > > > > broker
> > > > > > is particularly slower than other brokers in processing
> > > > > > UpdateMetadataRequest. When this is the case, it means that the
> > > broker
> > > > is
> > > > > > also particularly slower in processing LeaderAndIsrRequest, which
> > can
> > > > > cause
> > > > > > problem anyway because some partition will probably have no
> leader
> > > > during
> > > > > > this period. I am not sure problems (1) and (2) cause more
> problem
> > > than
> > > > > > what we already have.
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Hi, Dong,
> > > > > > >
> > > > > > > Great finding on the issue. It's a real problem. A few comments
> > > about
> > > > > the
> > > > > > > KIP. (1) I am not sure about updating controller_metadata_epoch
> > on
> > > > > every
> > > > > > > UpdateMetadataRequest. Currently, the controller can send
> > > > > > > UpdateMetadataRequest when there is no actual metadata change.
> > > Doing
> > > > > this
> > > > > > > may require unnecessary metadata refresh on the client. (2)
> > > > > > > controller_metadata_epoch is global across all topics. This
> means
> > > > that
> > > > > a
> > > > > > > client may be forced to update its metadata even when the
> > metadata
> > > > for
> > > > > > the
> > > > > > > topics that it cares haven't changed. (3) It doesn't seem that
> > the
> > > > KIP
> > > > > > > handles the corner case when a consumer is restarted. Say a
> > > consumer
> > > > > > reads
> > > > > > > from the new leader, commits the offset and then is restarted.
> On
> > > > > > restart,
> > > > > > > the consumer gets an outdated metadata and fetches from the old
> > > > leader.
> > > > > > > Then, the consumer will get into the offset out of range issue.
> > > > > > >
> > > > > > > Given the above, I am thinking of the following approach. We
> > > actually
> > > > > > > already have metadata versioning at the partition level. Each
> > > leader
> > > > > has
> > > > > > a
> > > > > > > leader epoch which is monotonically increasing. We can
> > potentially
> > > > > > > propagate leader epoch back in the metadata response and the
> > > clients
> > > > > can
> > > > > > > cache that. This solves the issue of (1) and (2). To solve (3),
> > > when
> > > > > > saving
> > > > > > > an offset, we could save both an offset and the corresponding
> > > leader
> > > > > > epoch.
> > > > > > > When fetching the data, the consumer provides both the offset
> and
> > > the
> > > > > > > leader epoch. A leader will only serve the request if its
> leader
> > > > epoch
> > > > > is
> > > > > > > equal to or greater than the leader epoch from the consumer. To
> > > > achieve
> > > > > > > this, we need to change the fetch request protocol and the
> offset
> > > > > commit
> > > > > > > api, which requires some more thoughts.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Bump up the thread.
> > > > > > > >
> > > > > > > > It will be great to have more comments on whether we should
> do
> > it
> > > > or
> > > > > > > > whether there is better way to address the motivation of this
> > > KIP.
> > > > > > > >
> > > > > > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > I don't have an interesting rejected alternative solution
> to
> > > put
> > > > in
> > > > > > the
> > > > > > > > > KIP. If there is good alternative solution from anyone in
> > this
> > > > > > thread,
> > > > > > > I
> > > > > > > > am
> > > > > > > > > happy to discuss this and update the KIP accordingly.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Dong
> > > > > > > > >
> > > > > > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu <
> yuzhih...@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > >> It is clearer now.
> > > > > > > > >>
> > > > > > > > >> I noticed that Rejected Alternatives section is empty.
> > > > > > > > >> Have you considered any alternative ?
> > > > > > > > >>
> > > > > > > > >> Cheers
> > > > > > > > >>
> > > > > > > > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> > Ted, thanks for catching this. I have updated the
> sentence
> > > to
> > > > > make
> > > > > > > it
> > > > > > > > >> > readable.
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Dong
> > > > > > > > >> >
> > > > > > > > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted Yu <
> > yuzhih...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > bq. It the controller_epoch of the incoming
> > > > MetadataResponse,
> > > > > or
> > > > > > > if
> > > > > > > > >> the
> > > > > > > > >> > > controller_epoch is the same but the
> > > > controller_metadata_epoch
> > > > > > > > >> > >
> > > > > > > > >> > > Can you update the above sentence so that the
> intention
> > is
> > > > > > > clearer ?
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks
> > > > > > > > >> > >
> > > > > > > > >> > > On Fri, Dec 1, 2017 at 6:33 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > Hi all,
> > > > > > > > >> > > >
> > > > > > > > >> > > > I have created KIP-232: Detect outdated metadata by
> > > adding
> > > > > > > > >> > > > ControllerMetadataEpoch field:
> > > > > > > > >> > > > https://cwiki.apache.org/
> > confluence/display/KAFKA/KIP-
> > > > > > > > >> > > > 232%3A+Detect+outdated+metadata+by+adding+
> > > > > > > > >> > ControllerMetadataEpoch+field
> > > > > > > > >> > > > .
> > > > > > > > >> > > >
> > > > > > > > >> > > > The KIP proposes to add fields in MetadataResponse
> and
> > > > > > > > >> > > > UpdateMetadataRequest so that client can reject
> > outdated
> > > > > > > metadata
> > > > > > > > >> and
> > > > > > > > >> > > avoid
> > > > > > > > >> > > > unnecessary OffsetOutOfRangeException. Otherwise
> there
> > > is
> > > > > > > > currently
> > > > > > > > >> > race
> > > > > > > > >> > > > condition that can cause consumer to reset offset
> > which
> > > > > > > negatively
> > > > > > > > >> > affect
> > > > > > > > >> > > > the consumer's availability.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Feedback and suggestions are welcome!
> > > > > > > > >> > > >
> > > > > > > > >> > > > Regards,
> > > > > > > > >> > > > Dong
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to