Hi Deng,

Thanks for the feedback.

On Mon, Nov 18, 2019 at 6:56 PM deng ziming <dengziming1...@gmail.com>
wrote:

> hi, I reviewed the current code, the ProduceMetadata maintains an expiry
> threshold for every topic, every time when we write to a topic we will set
> the expiry time to -1 to indicate it should be updated, this does work to
> reduce the size of the topic working set, but the producer will continue
> fetching metadata for these topics in every metadata request for the full
> expiry duration.
>

Indeed, you are correct, I terribly misread the code here. Fortunately this
was only a minor optimization in the KIP that's no longer necessary.


and we can improve the situation by 2 means:
>     1. we maintain a refresh threshold for every topic which is for example
> 0.8 * expiry_threshold, and when we send `MetadataRequest` to brokers we
> just request unknownLeaderTopics + unknownPartitionTopics + topics
> reach refresh threshold.
>

Right, this is similar to what I suggested, with a larger window on the
"staleness" that permits for batching to an appropriate size (except if
there's any unknown topics, you'd want to issue the request immediately).



>     2. we don't invoke KafkaProducer#waitOnMetadata when we call
> KafkaProducer#send because of we just send data to RecordAccumulator, and
> before we send data to brokers we will invoke RecordAccumulator#ready(), so
> we can only invoke waitOnMetadata to block when (number topics
> reach refresh threshold)>(number of all known topics)*0.2.
>

For new (uncached) topics, one problem here is that we don't know which
partition to map a record to in the event that it has a key or custom
partitioner, so the RecordAccumulator wouldn't know which batch/broker it
belongs. We'd need an intermediate record queue that subsequently moved the
records into RecordAccumulators once metadata resolution was complete. For
known topics, we don't currently block at all in waitOnMetadata.

The last major point of minimizing producer startup metadata RPCs may still
need to be improved, but this would be a large improvement on the current
situation.

Thanks,
Brian



> I think the above 2 ways are enough to solve the current problem.
>
> On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe <cmcc...@apache.org> wrote:
>
> > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
> > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe <cmcc...@apache.org>
> wrote:
> > >
> > > > Two seconds doesn't seem like a reasonable amount of time to leave
> for
> > the
> > > > metadata fetch.  Fetching halfway through the expiration period seems
> > more
> > > > reasonable.  It also doesn't require us to create a new configuration
> > key,
> > > > which is nice.
> > > >
> > > > Another option is to just do the metadata fetch every
> > metadata.max.age.ms,
> > > > but not expire the topic until we can't fetch the metadata for 2 *
> > > > metadata.max.age.ms.
> > > >
> > >
> > > I'd expect two seconds to be reasonable in the common case. Keep in
> mind
> > > that this doesn't affect correctness, and a control operation returning
> > > cached metadata should be on the order of milliseconds.
> > >
> >
> > Hi Brian,
> >
> > Thanks again for the KIP.
> >
> > I think the issue here is not the common case, but the uncommon case
> where
> > the metadata fetch takes longer than expected.  In that case, we don't
> want
> > to be in the position of having our metadata expire because we waited too
> > long to renew it.
> >
> > This is one reason why I think that the metadata expiration time should
> be
> > longer than the metadata refresh time.  In fact, it might be worth having
> > two separate configuration keys for these two values.  I could imagine a
> > user who is having trouble with metadata expiration wanting to increase
> the
> > metadata expiration time, but without increasing the metadata refresh
> > period.  In a sense, the metadata expiration time is like the ZK session
> > expiration time.  You might want to turn it up if the cluster is
> > experiencing load spikes.
> >
> > >
> > > But to the general
> > > point, defining the algorithm would mean enforcing it to fair accuracy,
> > > whereas if the suggestion is that it'll be performed at a reasonable
> > time,
> > > it allows for batching and other optimizations. Perhaps I shouldn't be
> > > regarding what's defined in a KIP to be contractual in these cases, but
> > you
> > > could consider a first implementation to collect topics whose metadata
> > has
> > > exceeded (metadata.max.age.ms / 2), and sending the batch once a
> > > constituent topic's metadata is near the expiry, or a sufficient number
> > of
> > > topics have been collected (10? 100? 1000?).
> > >
> >
> > I'm concerned that if we change the metadata caching strategy without
> > discussing it first, it may improve certain workloads but make others
> > worse.  We need to be concrete about what the proposed strategy is so
> that
> > we can really evaluate it.
> >
> > >
> > >
> > > > We should be specific about what happens if the first few metadata
> > fetches
> > > > fail.  Do we use exponential backoff to decide when to resend?  It
> > seems
> > > > like we really should, for all the usual reasons (reduce the load on
> > > > brokers, ride out temporary service disruptions, etc.)  Maybe we
> could
> > have
> > > > an exponential retry backoff for each broker (in other words, we
> > should try
> > > > to contact a different broker before applying the backoff.)  I think
> > this
> > > > already sort of happens with the disconnect timeout, but we might
> need
> > a
> > > > more general solution.
> > > >
> > >
> > > I don't plan to change this behavior. Currently it retries after a
> fixed
> > > value of 'retry.backoff.ms' (defaults to 100 ms). It's possible that
> > > different brokers are attempted, but I haven't dug into it.
> > >
> >
> > I think it's critical to understand what the current behavior is before
> we
> > try to change it.  The difference between retrying the same broker and
> > trying a different one has a large impact it has on cluster load and
> > latency.  For what it's worth, I believe the behavior is the second one,
> > but it has been a while since I checked.  Let's figure this out.
> >
> > >
> > > > Thanks for the clarification.  Fully asynchronous is the way to go, I
> > > > agree.  I'm having trouble understanding how timeouts are handled in
> > the
> > > > KIP.  It seems like if we can't fetch the metadata within the
> > designated
> > > > metadata timeout, the future / callback should receive a
> > TimeoutException
> > > > right?  We do not want the send call to be deferred forever if
> metadata
> > > > can't be fetched.  Eventually it should fail if it can't be
> performed.
> > > >
> > > > I do think this is something that will have to be mentioned in the
> > > > compatibility section.  There is some code out there that is probably
> > > > prepared to handle a timeout exception from the send function, which
> > may
> > > > need to be updated to check for a timeout from the future or
> callback.
> > > >
> > >
> > > Correct, a timeout exception would be delivered in the future. Sure, I
> > can
> > > add that note to the KIP.
> > >
> >
> > Thanks.
> >
> > best,
> > Colin
> >
> > >
> > >
> > > > It seems like this is an existing problem.  You may fire off a lot of
> > send
> > > > calls that get blocked because the broker that is the leader for a
> > certain
> > > > partition is not responding.  I'm not sure that we need to do
> anything
> > > > special here.  On the other hand, we could make the case for a
> generic
> > "max
> > > > number of outstanding sends" configuration to prevent surprise OOMs
> in
> > the
> > > > existing cases, plus the new one we're adding.  But this feels like a
> > bit
> > > > of a scope expansion.
> > > >
> > >
> > > Right, this is an existing problem, however the asynchronous send could
> > > cause unexpected behavior. For example, if a client pinned
> > > topics/partitions to individual send threads, then memory couldn't be
> > > exhausted by a single topic since a blocking send would prevent further
> > > records from being buffered on that topic. The compromise could be that
> > we
> > > only ever permit one outstanding record batch for a topic, which would
> > keep
> > > the code simple and wouldn't permit a single topic to consume all
> > available
> > > memory.
> > >
> > >
> > >
> > > > They may be connected, but I'm not sure they should be the same.
> > Perhaps
> > > > expiry should be 4x the typical fetch rate, for example.
> > > >
> > >
> > > That's true. You could also make the case for a faster expiry than
> > refresh
> > > as well. Makes sense to separate this out.
> > >
> > >
> > >
> > > > Hmm.... are you sure this is an N^2 problem?  If you have T1 and T2,
> > you
> > > > fetch metadata for T1 and T2, right?  I guess you could argue that we
> > often
> > > > fetch metadata for partitions we don't care about, but that doesn't
> > make it
> > > > O(N^2).  Maybe there's something about the implementation that I'm
> > missing.
> > > >
> > >
> > > My apologies, I left out the context. One issue the KIP is trying to
> > > resolve is the metadata storm that's caused by producers starting up.
> In
> > > the worst case, where the send call is only performed from a single
> > thread
> > > (i.e. no possible batching), fetching metadata for 1K topics will
> > generate
> > > 1K RPCs, with payload 1+2+...+1K topics' metadata. Being smart about
> the
> > > topics being refreshed would still generate 1K RPCs for 1 topic each,
> and
> > > asynchronous behavior would permit batching (note steady-state
> refreshing
> > > doesn't require the asynchronous behavior to batch).
> > >
> > >
> > >
> > > > In general, we need to take advantage of batching to do this well
> (one
> > > > reason why I think we should steer clear of ultra-granular timeout
> > > > tracking).  It's best to do 1 RPC asking for 10 topics, not 10 RPCs
> > asking
> > > > for a single topic each, even if that means some of the topic
> timeouts
> > are
> > > > not *exactly* aligned with the configured value.
> > > >
> > >
> > > Absolutely, agreed.
> > >
> > > Thanks,
> > > Brian
> > >
> > >
> > > > best,
> > > > Colin
> > > >
> > > >
> > > > >
> > > > > Thanks,
> > > > > Brian
> > > > >
> > > > > On Mon, Nov 11, 2019 at 11:47 AM Colin McCabe <cmcc...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi Brian,
> > > > > >
> > > > > > Thanks for the KIP.
> > > > > >
> > > > > > Starting the metadata fetch before we need the result is
> > definitely a
> > > > > > great idea.  This way, the metadata fetch can be done in parallel
> > with
> > > > all
> > > > > > the other stuff e producer is doing, rather than forcing the
> > producer
> > > > to
> > > > > > periodically come to a halt periodically while metadata is
> fetched.
> > > > > >
> > > > > > Maybe I missed it, but there seemed to be some details missing
> > here.
> > > > When
> > > > > > do we start the metadata fetch?  For example, if topic metadata
> > expires
> > > > > > every 5 minutes, perhaps we should wait 4 minutes, then starting
> > > > fetching
> > > > > > the new metadata, which we would expect to arrive by the 5 minute
> > > > > > deadline.  Or perhaps we should start the fetch even earlier,
> > around
> > > > the
> > > > > > 2.5 minute mark.  In any case, there should be some discussion
> > about
> > > > what
> > > > > > the actual policy is.  Given that metadata.max.age.ms is
> > configurable,
> > > > > > maybe that policy  needs to be expressed in terms of a percentage
> > of
> > > > the
> > > > > > refresh period rather than in terms of an absolute delay.
> > > > > >
> > > > > > The KIP correctly points out that the current metadata fetching
> > policy
> > > > > > causes us to "[block] in a function that's advertised as
> > asynchronous."
> > > > > > However, the KIP doesn't seem to spell out whether we will
> > continue to
> > > > > > block if metadata can't be found, or if this will be abolished.
> > > > Clearly,
> > > > > > starting the metadata fetch early will reduce blocking in the
> > common
> > > > case,
> > > > > > but will there still be blocking in the uncommon case where the
> > early
> > > > fetch
> > > > > > doesn't succeed in time?
> > > > > >
> > > > > >  > To address (2), the producer currently maintains an expiry
> > threshold
> > > > > > for
> > > > > >  > every topic, which is used to remove a topic from the working
> > set
> > > > at a
> > > > > >  > future time (currently hard-coded to 5 minutes, this should be
> > > > modified
> > > > > > to
> > > > > >  > use metadata.max.age.ms). While this does work to reduce the
> > size
> > > > of
> > > > > > the
> > > > > >  > topic working set, the producer will continue fetching
> metadata
> > for
> > > > > > these
> > > > > >  > topics in every metadata request for the full expiry duration.
> > This
> > > > > > logic
> > > > > >  > can be made more intelligent by managing the expiry from when
> > the
> > > > topic
> > > > > >  > was last used, enabling the expiry duration to be reduced to
> > improve
> > > > > > cases
> > > > > >  > where a large number of topics are touched intermittently.
> > > > > >
> > > > > > Can you clarify this part a bit?  It seems like we have a
> metadata
> > > > > > expiration policy now for topics, and we will have one after this
> > KIP,
> > > > but
> > > > > > they will be somewhat different.  But it's not clear to me what
> the
> > > > > > differences are.
> > > > > >
> > > > > > In general, if load is a problem, we should probably consider
> > adding
> > > > some
> > > > > > kind of jitter on the client side.  There are definitely cases
> > where
> > > > people
> > > > > > start up a lot of clients at the same time in parallel and there
> > is a
> > > > > > thundering herd problem with metadata updates.  Adding jitter
> would
> > > > spread
> > > > > > the load across time rather than creating a spike every 5 minutes
> > in
> > > > this
> > > > > > case.
> > > > > >
> > > > > > best,
> > > > > > Colin
> > > > > >
> > > > > >
> > > > > > On Fri, Nov 8, 2019, at 08:59, Ismael Juma wrote:
> > > > > > > I think this KIP affects when we block which is actually user
> > visible
> > > > > > > behavior. Right?
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Fri, Nov 8, 2019, 8:50 AM Brian Byrne <bby...@confluent.io>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Guozhang,
> > > > > > > >
> > > > > > > > Regarding metadata expiry, no access times other than the
> > initial
> > > > > > lookup[1]
> > > > > > > > are used for determining when to expire producer metadata.
> > > > Therefore,
> > > > > > > > frequently used topics' metadata will be aged out and
> > subsequently
> > > > > > > > refreshed (in a blocking manner) every five minutes, and
> > > > infrequently
> > > > > > used
> > > > > > > > topics will be retained for a minimum of five minutes and
> > currently
> > > > > > > > refetched on every metadata update during that time period.
> The
> > > > > > sentence is
> > > > > > > > suggesting that we could reduce the expiry time to improve
> the
> > > > latter
> > > > > > > > without affecting (rather slightly improving) the former.
> > > > > > > >
> > > > > > > > Keep in mind that in most all cases, I wouldn't anticipate
> > much of
> > > > a
> > > > > > > > difference with producer behavior, and the extra logic can be
> > > > > > implemented
> > > > > > > > to have insignificant cost. It's the large/dynamic topic
> corner
> > > > cases
> > > > > > that
> > > > > > > > we're trying to improve.
> > > > > > > >
> > > > > > > > It'd be convenient if the KIP is no longer necessary. You're
> > right
> > > > in
> > > > > > that
> > > > > > > > there's no public API changes and the behavioral changes are
> > > > entirely
> > > > > > > > internal. I'd be happy to continue the discussion around the
> > KIP,
> > > > but
> > > > > > > > unless otherwise objected, it can be retired.
> > > > > > > >
> > > > > > > > [1] Not entirely accurate, it's actually the first time when
> > the
> > > > client
> > > > > > > > calculates whether to retain the topic in its metadata.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Brian
> > > > > > > >
> > > > > > > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello Brian,
> > > > > > > > >
> > > > > > > > > Could you elaborate a bit more on this sentence: "This
> logic
> > can
> > > > be
> > > > > > made
> > > > > > > > > more intelligent by managing the expiry from when the topic
> > was
> > > > last
> > > > > > > > used,
> > > > > > > > > enabling the expiry duration to be reduced to improve cases
> > > > where a
> > > > > > large
> > > > > > > > > number of topics are touched intermittently." Not sure I
> > fully
> > > > > > understand
> > > > > > > > > the proposal.
> > > > > > > > >
> > > > > > > > > Also since now this KIP did not make any public API changes
> > and
> > > > the
> > > > > > > > > behavioral changes are not considered a public API contract
> > (i.e.
> > > > > > how we
> > > > > > > > > maintain the topic metadata in producer cache is never
> > committed
> > > > to
> > > > > > > > users),
> > > > > > > > > I wonder if we still need a KIP for the proposed change any
> > more?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Thu, Nov 7, 2019 at 12:43 PM Brian Byrne <
> > bby...@confluent.io
> > > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello all,
> > > > > > > > > >
> > > > > > > > > > I'd like to propose a vote for a producer change to
> improve
> > > > > > producer
> > > > > > > > > > behavior when dealing with a large number of topics, in
> > part by
> > > > > > > > reducing
> > > > > > > > > > the amount of metadata fetching performed.
> > > > > > > > > >
> > > > > > > > > > The full KIP is provided here:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
> > > > > > > > > >
> > > > > > > > > > And the discussion thread:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> >
> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Brian
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to