Hello all,

Does anyone else have opinions on the issue of RPC frequency? Would it be
better to remove the fetching of non-urgent topics altogether, so that the
refreshes are contained in a larger batch?

Thanks,
Brian


On Mon, Jan 6, 2020 at 2:40 PM Brian Byrne <bby...@confluent.io> wrote:

>
> So the performance of a metadata RPC that occurs every once every 10
> seconds should not be measured strictly in CPU cost, but rather the effect
> on the 95-99%. The larger the request is, the more opportunity there is to
> put a burst stress on the producer and broker, and the larger the response
> payload to push through the control plane socket. Maybe that's not at 5k
> topics, but there are groups that are 10k+ topics and pushing further.
>
> There's definitely weight to the metadata RPCs. Looking at a previous
> local, non-loaded test I ran, I calculate about 2 microseconds per
> partition latency to the producer. At 10,000 topics with 100 partitions
> each, that's a full 2-second bubble in the best case. I can rerun a more
> targeted performance test, but I feel that's missing the point.
>
> If you're arguing that the batch sizes are too small or time interval too
> short, that's fine, and I'll be happy to increase them. However, there is a
> breaking point to justify refreshing a subset of a producer's topics. Even
> if producers out number brokers by 10x, the fixed overhead cost of an RPC
> that occurs every 1-second is probably not worth scraping for performance,
> but worth sanitizing.
>
> Brian
>
> On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe <cmcc...@apache.org> wrote:
>
>> On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote:
>> > Hi Colin,
>> >
>> > Thanks again for the feedback!
>> >
>> > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe <cmcc...@apache.org>
>> wrote:
>> >
>> > > Metadata requests don't (always) go to the controller, right?  We
>> should
>> > > fix the wording in this section.
>> > >
>> >
>> > You're correct, s/controller/broker(s)/.
>> >
>> > I feel like "Proposed Changes" should come before "Public Interfaces"
>> > > here.  The new configuration won't make sense to the reader until he
>> or she
>> > > has read the "changes" section.  Also, it's not clear from the name
>> that
>> > > "metadata evict" refers to a span of time.  What do you think about "
>> > > metadata.eviction.period.ms" as a configuration name?
>> > >
>> >
>> > Sure, makes sense. Updated order and config name.
>> >
>> >
>> > > Where is the "10 seconds" coming from here?  The current default for
>> > > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want
>> to
>> > > refresh every 5 minutes.  Definitely not every 10 seconds.
>> > >
>> >
>> > The 10 seconds is another arbitrary value to prevent a large number of
>> RPCs
>> > if the target batch size were fixed at 20. For example, if there's 5,000
>> > topics with a 5-minute interval, then instead of issuing an RPC every
>> > 1.2 seconds with batch size of 20, it would issue an RPC every 10
>> seconds
>> > with batch size of 167.
>> >
>>
>> Hmm.  This will lead to many more RPCs compared to the current situation
>> of issuing an RPC every 5 minutes with 5,000 topics, right?  See below for
>> more discussion.
>>
>> >
>> >
>> > > Stepping back a little bit, it seems like the big problem you
>> identified
>> > > is the O(N^2) behavior of producing to X, then Y, then Z, etc. etc.
>> where
>> > > each new produce to a fresh topic triggers a metadata request with
>> all the
>> > > preceding topics included.
>> > >
>> > > Of course we need to send out a metadata request before producing to
>> X,
>> > > then Y, then Z, but that request could just specify X, or just
>> specify Y,
>> > > etc. etc.  In other words, we could decouple decouple the routine
>> metadata
>> > > fetch which happens on a 5 minute timer from the need to fetch
>> metadata for
>> > > something specific right now.
>> > >
>> > > I guess my question is, do we really need to allow routine metadata
>> > > fetches to "piggyback" on the emergency metadata fetches?  It adds a
>> lot of
>> > > complexity, and we don't have any benchmarks proving that it's better.
>> > > Also, as I understand it, whether we piggyback or not, the number of
>> > > metadata fetches is the same, right?
>> > >
>> >
>> > So it's possible to do as you suggest, but I would argue that it'd be
>> more
>> > complex with how the code is structured and wouldn't add any extra
>> > complexity. The derived metadata class effectively respond to a
>> > notification that a metadata RPC is going to be issued, where they
>> return
>> > the metadata request structure with topics to refresh, which is
>> decoupled
>> > from what generated the event (new topic, stale metadata, periodic
>> refresh
>> > alarm). There is also a strict implementation detail that only one
>> metadata
>> > request can be outstanding at any time, which lends itself to
>> consolidate
>> > complexity in the base metadata class and have the derived use the
>> "listen
>> > for next update" model.
>> >
>> > By maintaining an ordered list of topics by their last metadata refresh
>> > time (0 for new topics), it's a matter of pulling from the front of the
>> > list to see which topics should be included in the next update. Always
>> > include all urgent topics, then include non-urgent (i.e. need to be
>> > refreshed soon-ish) up to the target batch size.
>> >
>> > The number of metadata fetches could be reduced. Assuming a target batch
>> > size of 20, a new topic might also refresh 19 other "refresh soon"
>> topics
>> > in the same RPC, as opposed to those 19 being handled in a subsequent
>> RPC.
>> >
>> > Although to counter the above, the batching/piggybacking logic isn't
>> > necessarily about reducing the total number of RPCs, but rather to
>> > distribute the load more evenly over time. For example, if a large
>> number
>> > of topics need to be refreshed at the approximate same time (common for
>> > startups cases that hit a large number of topics), the updates are more
>> > evenly distributed to avoid a flood.
>>
>> It wouldn't be a flood in the current case, right?  It would just be a
>> single metadata request for a lot of topics.
>>
>> Let's compare the two cases.  In the current scenario, we have 1 metadata
>> request every 5 minutes.  This request is for 5,000 topics (let's say).  In
>> the new scenario, we have a request every 10 seconds for 167 topics each.
>>
>> Which do you think will be more expensive?  I think the second scenario
>> certainly will because of the overhead of 30x as many requests send over
>> the wire.  Metadata accesses are now lockless, so the big metadata request
>> just isn't that much of a problem.  I bet if you benchmark it, sending back
>> metadata for 167 topics won't be that much cheaper than sending back
>> metadata for 5k.  Certainly not 30x cheaper.  There will eventually be a
>> point where we need to split metadata requests, but it's definitely not at
>> 5,000 topics.
>>
>> regards,
>> Colin
>>
>>
>> >
>> > Brian
>> >
>> >
>> >
>> > > On Mon, Jan 6, 2020, at 10:26, Lucas Bradstreet wrote:
>> > > > +1 (non-binding)
>> > > >
>> > > > On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne <bby...@confluent.io>
>> wrote:
>> > > >
>> > > > > Hello all,
>> > > > >
>> > > > > After further discussion and improvements, I'd like to reinstate
>> the
>> > > voting
>> > > > > process.
>> > > > >
>> > > > > The updated KIP:
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526
>> > > > > %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
>> > > > > <
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
>> > > >
>> > > > >
>> > > > > The continued discussion:
>> > > > >
>> > > > >
>> > >
>> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
>> > > > >
>> > > > > I'd be happy to address any further comments/feedback.
>> > > > >
>> > > > > Thanks,
>> > > > > Brian
>> > > > >
>> > > > > On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang <wangg...@gmail.com
>> >
>> > > wrote:
>> > > > >
>> > > > > > With the concluded summary on the other discussion thread, I'm
>> +1 on
>> > > the
>> > > > > > proposal.
>> > > > > >
>> > > > > > Thanks Brian!
>> > > > > >
>> > > > > > On Tue, Nov 19, 2019 at 8:00 PM deng ziming <
>> > > dengziming1...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > >
>> > > > > > > > For new (uncached) topics, one problem here is that we
>> don't know
>> > > > > which
>> > > > > > > > partition to map a record to in the event that it has a key
>> or
>> > > custom
>> > > > > > > > partitioner, so the RecordAccumulator wouldn't know which
>> > > > > batch/broker
>> > > > > > it
>> > > > > > > > belongs. We'd need an intermediate record queue that
>> subsequently
>> > > > > moved
>> > > > > > > the
>> > > > > > > > records into RecordAccumulators once metadata resolution was
>> > > > > complete.
>> > > > > > > For
>> > > > > > > > known topics, we don't currently block at all in
>> waitOnMetadata.
>> > > > > > > >
>> > > > > > >
>> > > > > > > You are right, I forget this fact, and the intermediate record
>> > > queue
>> > > > > will
>> > > > > > > help, but I have some questions
>> > > > > > >
>> > > > > > > if we add an intermediate record queue in KafkaProducer, when
>> > > should we
>> > > > > > > move the records into RecordAccumulators?
>> > > > > > > only NetworkClient is aware of the MetadataResponse, here is
>> the
>> > > > > > > hierarchical structure of the related classes:
>> > > > > > > KafkaProducer
>> > > > > > >     Accumulator
>> > > > > > >     Sender
>> > > > > > >         NetworkClient
>> > > > > > >             metadataUpdater.handleCompletedMetadataResponse
>> > > > > > >
>> > > > > > > so
>> > > > > > > 1. we should also add a metadataUpdater to KafkaProducer?
>> > > > > > > 2. if the topic really does not exists? the intermediate
>> record
>> > > queue
>> > > > > > will
>> > > > > > > become too large?
>> > > > > > > 3. and should we `block` when the intermediate record queue
>> is too
>> > > > > large?
>> > > > > > > and this will again bring the blocking problem?
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne <
>> bby...@confluent.io>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Deng,
>> > > > > > > >
>> > > > > > > > Thanks for the feedback.
>> > > > > > > >
>> > > > > > > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming <
>> > > > > dengziming1...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > hi, I reviewed the current code, the ProduceMetadata
>> maintains
>> > > an
>> > > > > > > expiry
>> > > > > > > > > threshold for every topic, every time when we write to a
>> topic
>> > > we
>> > > > > > will
>> > > > > > > > set
>> > > > > > > > > the expiry time to -1 to indicate it should be updated,
>> this
>> > > does
>> > > > > > work
>> > > > > > > to
>> > > > > > > > > reduce the size of the topic working set, but the
>> producer will
>> > > > > > > continue
>> > > > > > > > > fetching metadata for these topics in every metadata
>> request
>> > > for
>> > > > > the
>> > > > > > > full
>> > > > > > > > > expiry duration.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > > Indeed, you are correct, I terribly misread the code here.
>> > > > > Fortunately
>> > > > > > > this
>> > > > > > > > was only a minor optimization in the KIP that's no longer
>> > > necessary.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > and we can improve the situation by 2 means:
>> > > > > > > > >     1. we maintain a refresh threshold for every topic
>> which
>> > > is for
>> > > > > > > > example
>> > > > > > > > > 0.8 * expiry_threshold, and when we send
>> `MetadataRequest` to
>> > > > > brokers
>> > > > > > > we
>> > > > > > > > > just request unknownLeaderTopics + unknownPartitionTopics
>> +
>> > > topics
>> > > > > > > > > reach refresh threshold.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > > Right, this is similar to what I suggested, with a larger
>> window
>> > > on
>> > > > > the
>> > > > > > > > "staleness" that permits for batching to an appropriate size
>> > > (except
>> > > > > if
>> > > > > > > > there's any unknown topics, you'd want to issue the request
>> > > > > > immediately).
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > >     2. we don't invoke KafkaProducer#waitOnMetadata when
>> we
>> > > call
>> > > > > > > > > KafkaProducer#send because of we just send data to
>> > > > > RecordAccumulator,
>> > > > > > > and
>> > > > > > > > > before we send data to brokers we will invoke
>> > > > > > > RecordAccumulator#ready(),
>> > > > > > > > so
>> > > > > > > > > we can only invoke waitOnMetadata to block when (number
>> topics
>> > > > > > > > > reach refresh threshold)>(number of all known topics)*0.2.
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > > For new (uncached) topics, one problem here is that we
>> don't know
>> > > > > which
>> > > > > > > > partition to map a record to in the event that it has a key
>> or
>> > > custom
>> > > > > > > > partitioner, so the RecordAccumulator wouldn't know which
>> > > > > batch/broker
>> > > > > > it
>> > > > > > > > belongs. We'd need an intermediate record queue that
>> subsequently
>> > > > > moved
>> > > > > > > the
>> > > > > > > > records into RecordAccumulators once metadata resolution was
>> > > > > complete.
>> > > > > > > For
>> > > > > > > > known topics, we don't currently block at all in
>> waitOnMetadata.
>> > > > > > > >
>> > > > > > > > The last major point of minimizing producer startup metadata
>> > > RPCs may
>> > > > > > > still
>> > > > > > > > need to be improved, but this would be a large improvement
>> on the
>> > > > > > current
>> > > > > > > > situation.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Brian
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > > I think the above 2 ways are enough to solve the current
>> > > problem.
>> > > > > > > > >
>> > > > > > > > > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe <
>> > > cmcc...@apache.org>
>> > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote:
>> > > > > > > > > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe <
>> > > > > cmcc...@apache.org
>> > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Two seconds doesn't seem like a reasonable amount of
>> > > time to
>> > > > > > > leave
>> > > > > > > > > for
>> > > > > > > > > > the
>> > > > > > > > > > > > metadata fetch.  Fetching halfway through the
>> expiration
>> > > > > period
>> > > > > > > > seems
>> > > > > > > > > > more
>> > > > > > > > > > > > reasonable.  It also doesn't require us to create a
>> new
>> > > > > > > > configuration
>> > > > > > > > > > key,
>> > > > > > > > > > > > which is nice.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Another option is to just do the metadata fetch
>> every
>> > > > > > > > > > metadata.max.age.ms,
>> > > > > > > > > > > > but not expire the topic until we can't fetch the
>> > > metadata
>> > > > > for
>> > > > > > 2
>> > > > > > > *
>> > > > > > > > > > > > metadata.max.age.ms.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > I'd expect two seconds to be reasonable in the common
>> case.
>> > > > > Keep
>> > > > > > in
>> > > > > > > > > mind
>> > > > > > > > > > > that this doesn't affect correctness, and a control
>> > > operation
>> > > > > > > > returning
>> > > > > > > > > > > cached metadata should be on the order of
>> milliseconds.
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Hi Brian,
>> > > > > > > > > >
>> > > > > > > > > > Thanks again for the KIP.
>> > > > > > > > > >
>> > > > > > > > > > I think the issue here is not the common case, but the
>> > > uncommon
>> > > > > > case
>> > > > > > > > > where
>> > > > > > > > > > the metadata fetch takes longer than expected.  In that
>> > > case, we
>> > > > > > > don't
>> > > > > > > > > want
>> > > > > > > > > > to be in the position of having our metadata expire
>> because
>> > > we
>> > > > > > waited
>> > > > > > > > too
>> > > > > > > > > > long to renew it.
>> > > > > > > > > >
>> > > > > > > > > > This is one reason why I think that the metadata
>> expiration
>> > > time
>> > > > > > > should
>> > > > > > > > > be
>> > > > > > > > > > longer than the metadata refresh time.  In fact, it
>> might be
>> > > > > worth
>> > > > > > > > having
>> > > > > > > > > > two separate configuration keys for these two values.  I
>> > > could
>> > > > > > > imagine
>> > > > > > > > a
>> > > > > > > > > > user who is having trouble with metadata expiration
>> wanting
>> > > to
>> > > > > > > increase
>> > > > > > > > > the
>> > > > > > > > > > metadata expiration time, but without increasing the
>> metadata
>> > > > > > refresh
>> > > > > > > > > > period.  In a sense, the metadata expiration time is
>> like
>> > > the ZK
>> > > > > > > > session
>> > > > > > > > > > expiration time.  You might want to turn it up if the
>> > > cluster is
>> > > > > > > > > > experiencing load spikes.
>> > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > But to the general
>> > > > > > > > > > > point, defining the algorithm would mean enforcing it
>> to
>> > > fair
>> > > > > > > > accuracy,
>> > > > > > > > > > > whereas if the suggestion is that it'll be performed
>> at a
>> > > > > > > reasonable
>> > > > > > > > > > time,
>> > > > > > > > > > > it allows for batching and other optimizations.
>> Perhaps I
>> > > > > > shouldn't
>> > > > > > > > be
>> > > > > > > > > > > regarding what's defined in a KIP to be contractual in
>> > > these
>> > > > > > cases,
>> > > > > > > > but
>> > > > > > > > > > you
>> > > > > > > > > > > could consider a first implementation to collect
>> topics
>> > > whose
>> > > > > > > > metadata
>> > > > > > > > > > has
>> > > > > > > > > > > exceeded (metadata.max.age.ms / 2), and sending the
>> batch
>> > > > > once a
>> > > > > > > > > > > constituent topic's metadata is near the expiry, or a
>> > > > > sufficient
>> > > > > > > > number
>> > > > > > > > > > of
>> > > > > > > > > > > topics have been collected (10? 100? 1000?).
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > I'm concerned that if we change the metadata caching
>> strategy
>> > > > > > without
>> > > > > > > > > > discussing it first, it may improve certain workloads
>> but
>> > > make
>> > > > > > others
>> > > > > > > > > > worse.  We need to be concrete about what the proposed
>> > > strategy
>> > > > > is
>> > > > > > so
>> > > > > > > > > that
>> > > > > > > > > > we can really evaluate it.
>> > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > We should be specific about what happens if the
>> first few
>> > > > > > > metadata
>> > > > > > > > > > fetches
>> > > > > > > > > > > > fail.  Do we use exponential backoff to decide when
>> to
>> > > > > resend?
>> > > > > > > It
>> > > > > > > > > > seems
>> > > > > > > > > > > > like we really should, for all the usual reasons
>> (reduce
>> > > the
>> > > > > > load
>> > > > > > > > on
>> > > > > > > > > > > > brokers, ride out temporary service disruptions,
>> etc.)
>> > > Maybe
>> > > > > > we
>> > > > > > > > > could
>> > > > > > > > > > have
>> > > > > > > > > > > > an exponential retry backoff for each broker (in
>> other
>> > > words,
>> > > > > > we
>> > > > > > > > > > should try
>> > > > > > > > > > > > to contact a different broker before applying the
>> > > backoff.)
>> > > > > I
>> > > > > > > > think
>> > > > > > > > > > this
>> > > > > > > > > > > > already sort of happens with the disconnect
>> timeout, but
>> > > we
>> > > > > > might
>> > > > > > > > > need
>> > > > > > > > > > a
>> > > > > > > > > > > > more general solution.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > I don't plan to change this behavior. Currently it
>> retries
>> > > > > after
>> > > > > > a
>> > > > > > > > > fixed
>> > > > > > > > > > > value of 'retry.backoff.ms' (defaults to 100 ms).
>> It's
>> > > > > possible
>> > > > > > > that
>> > > > > > > > > > > different brokers are attempted, but I haven't dug
>> into it.
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > I think it's critical to understand what the current
>> > > behavior is
>> > > > > > > before
>> > > > > > > > > we
>> > > > > > > > > > try to change it.  The difference between retrying the
>> same
>> > > > > broker
>> > > > > > > and
>> > > > > > > > > > trying a different one has a large impact it has on
>> cluster
>> > > load
>> > > > > > and
>> > > > > > > > > > latency.  For what it's worth, I believe the behavior
>> is the
>> > > > > second
>> > > > > > > > one,
>> > > > > > > > > > but it has been a while since I checked.  Let's figure
>> this
>> > > out.
>> > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > Thanks for the clarification.  Fully asynchronous
>> is the
>> > > way
>> > > > > to
>> > > > > > > > go, I
>> > > > > > > > > > > > agree.  I'm having trouble understanding how
>> timeouts are
>> > > > > > handled
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > > KIP.  It seems like if we can't fetch the metadata
>> > > within the
>> > > > > > > > > > designated
>> > > > > > > > > > > > metadata timeout, the future / callback should
>> receive a
>> > > > > > > > > > TimeoutException
>> > > > > > > > > > > > right?  We do not want the send call to be deferred
>> > > forever
>> > > > > if
>> > > > > > > > > metadata
>> > > > > > > > > > > > can't be fetched.  Eventually it should fail if it
>> can't
>> > > be
>> > > > > > > > > performed.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I do think this is something that will have to be
>> > > mentioned
>> > > > > in
>> > > > > > > the
>> > > > > > > > > > > > compatibility section.  There is some code out there
>> > > that is
>> > > > > > > > probably
>> > > > > > > > > > > > prepared to handle a timeout exception from the send
>> > > > > function,
>> > > > > > > > which
>> > > > > > > > > > may
>> > > > > > > > > > > > need to be updated to check for a timeout from the
>> > > future or
>> > > > > > > > > callback.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Correct, a timeout exception would be delivered in the
>> > > future.
>> > > > > > > Sure,
>> > > > > > > > I
>> > > > > > > > > > can
>> > > > > > > > > > > add that note to the KIP.
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Thanks.
>> > > > > > > > > >
>> > > > > > > > > > best,
>> > > > > > > > > > Colin
>> > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > It seems like this is an existing problem.  You may
>> fire
>> > > off
>> > > > > a
>> > > > > > > lot
>> > > > > > > > of
>> > > > > > > > > > send
>> > > > > > > > > > > > calls that get blocked because the broker that is
>> the
>> > > leader
>> > > > > > for
>> > > > > > > a
>> > > > > > > > > > certain
>> > > > > > > > > > > > partition is not responding.  I'm not sure that we
>> need
>> > > to do
>> > > > > > > > > anything
>> > > > > > > > > > > > special here.  On the other hand, we could make the
>> case
>> > > for
>> > > > > a
>> > > > > > > > > generic
>> > > > > > > > > > "max
>> > > > > > > > > > > > number of outstanding sends" configuration to
>> prevent
>> > > > > surprise
>> > > > > > > OOMs
>> > > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > > existing cases, plus the new one we're adding.  But
>> this
>> > > > > feels
>> > > > > > > > like a
>> > > > > > > > > > bit
>> > > > > > > > > > > > of a scope expansion.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Right, this is an existing problem, however the
>> > > asynchronous
>> > > > > send
>> > > > > > > > could
>> > > > > > > > > > > cause unexpected behavior. For example, if a client
>> pinned
>> > > > > > > > > > > topics/partitions to individual send threads, then
>> memory
>> > > > > > couldn't
>> > > > > > > be
>> > > > > > > > > > > exhausted by a single topic since a blocking send
>> would
>> > > prevent
>> > > > > > > > further
>> > > > > > > > > > > records from being buffered on that topic. The
>> compromise
>> > > could
>> > > > > > be
>> > > > > > > > that
>> > > > > > > > > > we
>> > > > > > > > > > > only ever permit one outstanding record batch for a
>> topic,
>> > > > > which
>> > > > > > > > would
>> > > > > > > > > > keep
>> > > > > > > > > > > the code simple and wouldn't permit a single topic to
>> > > consume
>> > > > > all
>> > > > > > > > > > available
>> > > > > > > > > > > memory.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > They may be connected, but I'm not sure they should
>> be
>> > > the
>> > > > > > same.
>> > > > > > > > > > Perhaps
>> > > > > > > > > > > > expiry should be 4x the typical fetch rate, for
>> example.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > That's true. You could also make the case for a faster
>> > > expiry
>> > > > > > than
>> > > > > > > > > > refresh
>> > > > > > > > > > > as well. Makes sense to separate this out.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > Hmm.... are you sure this is an N^2 problem?  If you
>> > > have T1
>> > > > > > and
>> > > > > > > > T2,
>> > > > > > > > > > you
>> > > > > > > > > > > > fetch metadata for T1 and T2, right?  I guess you
>> could
>> > > argue
>> > > > > > > that
>> > > > > > > > we
>> > > > > > > > > > often
>> > > > > > > > > > > > fetch metadata for partitions we don't care about,
>> but
>> > > that
>> > > > > > > doesn't
>> > > > > > > > > > make it
>> > > > > > > > > > > > O(N^2).  Maybe there's something about the
>> implementation
>> > > > > that
>> > > > > > > I'm
>> > > > > > > > > > missing.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > My apologies, I left out the context. One issue the
>> KIP is
>> > > > > trying
>> > > > > > > to
>> > > > > > > > > > > resolve is the metadata storm that's caused by
>> producers
>> > > > > starting
>> > > > > > > up.
>> > > > > > > > > In
>> > > > > > > > > > > the worst case, where the send call is only performed
>> from
>> > > a
>> > > > > > single
>> > > > > > > > > > thread
>> > > > > > > > > > > (i.e. no possible batching), fetching metadata for 1K
>> > > topics
>> > > > > will
>> > > > > > > > > > generate
>> > > > > > > > > > > 1K RPCs, with payload 1+2+...+1K topics' metadata.
>> Being
>> > > smart
>> > > > > > > about
>> > > > > > > > > the
>> > > > > > > > > > > topics being refreshed would still generate 1K RPCs
>> for 1
>> > > topic
>> > > > > > > each,
>> > > > > > > > > and
>> > > > > > > > > > > asynchronous behavior would permit batching (note
>> > > steady-state
>> > > > > > > > > refreshing
>> > > > > > > > > > > doesn't require the asynchronous behavior to batch).
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > In general, we need to take advantage of batching
>> to do
>> > > this
>> > > > > > well
>> > > > > > > > > (one
>> > > > > > > > > > > > reason why I think we should steer clear of
>> > > ultra-granular
>> > > > > > > timeout
>> > > > > > > > > > > > tracking).  It's best to do 1 RPC asking for 10
>> topics,
>> > > not
>> > > > > 10
>> > > > > > > RPCs
>> > > > > > > > > > asking
>> > > > > > > > > > > > for a single topic each, even if that means some of
>> the
>> > > topic
>> > > > > > > > > timeouts
>> > > > > > > > > > are
>> > > > > > > > > > > > not *exactly* aligned with the configured value.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > Absolutely, agreed.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > > Brian
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > > best,
>> > > > > > > > > > > > Colin
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > Brian
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Mon, Nov 11, 2019 at 11:47 AM Colin McCabe <
>> > > > > > > > cmcc...@apache.org>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > Hi Brian,
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks for the KIP.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Starting the metadata fetch before we need the
>> > > result is
>> > > > > > > > > > definitely a
>> > > > > > > > > > > > > > great idea.  This way, the metadata fetch can be
>> > > done in
>> > > > > > > > parallel
>> > > > > > > > > > with
>> > > > > > > > > > > > all
>> > > > > > > > > > > > > > the other stuff e producer is doing, rather than
>> > > forcing
>> > > > > > the
>> > > > > > > > > > producer
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > periodically come to a halt periodically while
>> > > metadata
>> > > > > is
>> > > > > > > > > fetched.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Maybe I missed it, but there seemed to be some
>> > > details
>> > > > > > > missing
>> > > > > > > > > > here.
>> > > > > > > > > > > > When
>> > > > > > > > > > > > > > do we start the metadata fetch?  For example, if
>> > > topic
>> > > > > > > metadata
>> > > > > > > > > > expires
>> > > > > > > > > > > > > > every 5 minutes, perhaps we should wait 4
>> minutes,
>> > > then
>> > > > > > > > starting
>> > > > > > > > > > > > fetching
>> > > > > > > > > > > > > > the new metadata, which we would expect to
>> arrive by
>> > > the
>> > > > > 5
>> > > > > > > > minute
>> > > > > > > > > > > > > > deadline.  Or perhaps we should start the fetch
>> even
>> > > > > > earlier,
>> > > > > > > > > > around
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > 2.5 minute mark.  In any case, there should be
>> some
>> > > > > > > discussion
>> > > > > > > > > > about
>> > > > > > > > > > > > what
>> > > > > > > > > > > > > > the actual policy is.  Given that
>> > > metadata.max.age.ms is
>> > > > > > > > > > configurable,
>> > > > > > > > > > > > > > maybe that policy  needs to be expressed in
>> terms of
>> > > a
>> > > > > > > > percentage
>> > > > > > > > > > of
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > refresh period rather than in terms of an
>> absolute
>> > > delay.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The KIP correctly points out that the current
>> > > metadata
>> > > > > > > fetching
>> > > > > > > > > > policy
>> > > > > > > > > > > > > > causes us to "[block] in a function that's
>> > > advertised as
>> > > > > > > > > > asynchronous."
>> > > > > > > > > > > > > > However, the KIP doesn't seem to spell out
>> whether we
>> > > > > will
>> > > > > > > > > > continue to
>> > > > > > > > > > > > > > block if metadata can't be found, or if this
>> will be
>> > > > > > > abolished.
>> > > > > > > > > > > > Clearly,
>> > > > > > > > > > > > > > starting the metadata fetch early will reduce
>> > > blocking in
>> > > > > > the
>> > > > > > > > > > common
>> > > > > > > > > > > > case,
>> > > > > > > > > > > > > > but will there still be blocking in the
>> uncommon case
>> > > > > where
>> > > > > > > the
>> > > > > > > > > > early
>> > > > > > > > > > > > fetch
>> > > > > > > > > > > > > > doesn't succeed in time?
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >  > To address (2), the producer currently
>> maintains
>> > > an
>> > > > > > expiry
>> > > > > > > > > > threshold
>> > > > > > > > > > > > > > for
>> > > > > > > > > > > > > >  > every topic, which is used to remove a topic
>> from
>> > > the
>> > > > > > > > working
>> > > > > > > > > > set
>> > > > > > > > > > > > at a
>> > > > > > > > > > > > > >  > future time (currently hard-coded to 5
>> minutes,
>> > > this
>> > > > > > > should
>> > > > > > > > be
>> > > > > > > > > > > > modified
>> > > > > > > > > > > > > > to
>> > > > > > > > > > > > > >  > use metadata.max.age.ms). While this does
>> work to
>> > > > > > reduce
>> > > > > > > > the
>> > > > > > > > > > size
>> > > > > > > > > > > > of
>> > > > > > > > > > > > > > the
>> > > > > > > > > > > > > >  > topic working set, the producer will continue
>> > > fetching
>> > > > > > > > > metadata
>> > > > > > > > > > for
>> > > > > > > > > > > > > > these
>> > > > > > > > > > > > > >  > topics in every metadata request for the full
>> > > expiry
>> > > > > > > > duration.
>> > > > > > > > > > This
>> > > > > > > > > > > > > > logic
>> > > > > > > > > > > > > >  > can be made more intelligent by managing the
>> > > expiry
>> > > > > from
>> > > > > > > > when
>> > > > > > > > > > the
>> > > > > > > > > > > > topic
>> > > > > > > > > > > > > >  > was last used, enabling the expiry duration
>> to be
>> > > > > > reduced
>> > > > > > > to
>> > > > > > > > > > improve
>> > > > > > > > > > > > > > cases
>> > > > > > > > > > > > > >  > where a large number of topics are touched
>> > > > > > intermittently.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Can you clarify this part a bit?  It seems like
>> we
>> > > have a
>> > > > > > > > > metadata
>> > > > > > > > > > > > > > expiration policy now for topics, and we will
>> have
>> > > one
>> > > > > > after
>> > > > > > > > this
>> > > > > > > > > > KIP,
>> > > > > > > > > > > > but
>> > > > > > > > > > > > > > they will be somewhat different.  But it's not
>> clear
>> > > to
>> > > > > me
>> > > > > > > what
>> > > > > > > > > the
>> > > > > > > > > > > > > > differences are.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > In general, if load is a problem, we should
>> probably
>> > > > > > consider
>> > > > > > > > > > adding
>> > > > > > > > > > > > some
>> > > > > > > > > > > > > > kind of jitter on the client side.  There are
>> > > definitely
>> > > > > > > cases
>> > > > > > > > > > where
>> > > > > > > > > > > > people
>> > > > > > > > > > > > > > start up a lot of clients at the same time in
>> > > parallel
>> > > > > and
>> > > > > > > > there
>> > > > > > > > > > is a
>> > > > > > > > > > > > > > thundering herd problem with metadata updates.
>> > > Adding
>> > > > > > jitter
>> > > > > > > > > would
>> > > > > > > > > > > > spread
>> > > > > > > > > > > > > > the load across time rather than creating a
>> spike
>> > > every 5
>> > > > > > > > minutes
>> > > > > > > > > > in
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > > case.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > best,
>> > > > > > > > > > > > > > Colin
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Fri, Nov 8, 2019, at 08:59, Ismael Juma
>> wrote:
>> > > > > > > > > > > > > > > I think this KIP affects when we block which
>> is
>> > > > > actually
>> > > > > > > user
>> > > > > > > > > > visible
>> > > > > > > > > > > > > > > behavior. Right?
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Ismael
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On Fri, Nov 8, 2019, 8:50 AM Brian Byrne <
>> > > > > > > > bby...@confluent.io>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Hi Guozhang,
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Regarding metadata expiry, no access times
>> other
>> > > than
>> > > > > > the
>> > > > > > > > > > initial
>> > > > > > > > > > > > > > lookup[1]
>> > > > > > > > > > > > > > > > are used for determining when to expire
>> producer
>> > > > > > > metadata.
>> > > > > > > > > > > > Therefore,
>> > > > > > > > > > > > > > > > frequently used topics' metadata will be
>> aged
>> > > out and
>> > > > > > > > > > subsequently
>> > > > > > > > > > > > > > > > refreshed (in a blocking manner) every five
>> > > minutes,
>> > > > > > and
>> > > > > > > > > > > > infrequently
>> > > > > > > > > > > > > > used
>> > > > > > > > > > > > > > > > topics will be retained for a minimum of
>> five
>> > > minutes
>> > > > > > and
>> > > > > > > > > > currently
>> > > > > > > > > > > > > > > > refetched on every metadata update during
>> that
>> > > time
>> > > > > > > period.
>> > > > > > > > > The
>> > > > > > > > > > > > > > sentence is
>> > > > > > > > > > > > > > > > suggesting that we could reduce the expiry
>> time
>> > > to
>> > > > > > > improve
>> > > > > > > > > the
>> > > > > > > > > > > > latter
>> > > > > > > > > > > > > > > > without affecting (rather slightly
>> improving) the
>> > > > > > former.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Keep in mind that in most all cases, I
>> wouldn't
>> > > > > > > anticipate
>> > > > > > > > > > much of
>> > > > > > > > > > > > a
>> > > > > > > > > > > > > > > > difference with producer behavior, and the
>> extra
>> > > > > logic
>> > > > > > > can
>> > > > > > > > be
>> > > > > > > > > > > > > > implemented
>> > > > > > > > > > > > > > > > to have insignificant cost. It's the
>> > > large/dynamic
>> > > > > > topic
>> > > > > > > > > corner
>> > > > > > > > > > > > cases
>> > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > we're trying to improve.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > It'd be convenient if the KIP is no longer
>> > > necessary.
>> > > > > > > > You're
>> > > > > > > > > > right
>> > > > > > > > > > > > in
>> > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > there's no public API changes and the
>> behavioral
>> > > > > > changes
>> > > > > > > > are
>> > > > > > > > > > > > entirely
>> > > > > > > > > > > > > > > > internal. I'd be happy to continue the
>> discussion
>> > > > > > around
>> > > > > > > > the
>> > > > > > > > > > KIP,
>> > > > > > > > > > > > but
>> > > > > > > > > > > > > > > > unless otherwise objected, it can be
>> retired.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > [1] Not entirely accurate, it's actually the
>> > > first
>> > > > > time
>> > > > > > > > when
>> > > > > > > > > > the
>> > > > > > > > > > > > client
>> > > > > > > > > > > > > > > > calculates whether to retain the topic in
>> its
>> > > > > metadata.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > Brian
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang
>> Wang <
>> > > > > > > > > > wangg...@gmail.com>
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Hello Brian,
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Could you elaborate a bit more on this
>> > > sentence:
>> > > > > > "This
>> > > > > > > > > logic
>> > > > > > > > > > can
>> > > > > > > > > > > > be
>> > > > > > > > > > > > > > made
>> > > > > > > > > > > > > > > > > more intelligent by managing the expiry
>> from
>> > > when
>> > > > > the
>> > > > > > > > topic
>> > > > > > > > > > was
>> > > > > > > > > > > > last
>> > > > > > > > > > > > > > > > used,
>> > > > > > > > > > > > > > > > > enabling the expiry duration to be
>> reduced to
>> > > > > improve
>> > > > > > > > cases
>> > > > > > > > > > > > where a
>> > > > > > > > > > > > > > large
>> > > > > > > > > > > > > > > > > number of topics are touched
>> intermittently."
>> > > Not
>> > > > > > sure
>> > > > > > > I
>> > > > > > > > > > fully
>> > > > > > > > > > > > > > understand
>> > > > > > > > > > > > > > > > > the proposal.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Also since now this KIP did not make any
>> > > public API
>> > > > > > > > changes
>> > > > > > > > > > and
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > behavioral changes are not considered a
>> public
>> > > API
>> > > > > > > > contract
>> > > > > > > > > > (i.e.
>> > > > > > > > > > > > > > how we
>> > > > > > > > > > > > > > > > > maintain the topic metadata in producer
>> cache
>> > > is
>> > > > > > never
>> > > > > > > > > > committed
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > users),
>> > > > > > > > > > > > > > > > > I wonder if we still need a KIP for the
>> > > proposed
>> > > > > > change
>> > > > > > > > any
>> > > > > > > > > > more?
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Guozhang
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > On Thu, Nov 7, 2019 at 12:43 PM Brian
>> Byrne <
>> > > > > > > > > > bby...@confluent.io
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Hello all,
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > I'd like to propose a vote for a
>> producer
>> > > change
>> > > > > to
>> > > > > > > > > improve
>> > > > > > > > > > > > > > producer
>> > > > > > > > > > > > > > > > > > behavior when dealing with a large
>> number of
>> > > > > > topics,
>> > > > > > > in
>> > > > > > > > > > part by
>> > > > > > > > > > > > > > > > reducing
>> > > > > > > > > > > > > > > > > > the amount of metadata fetching
>> performed.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > The full KIP is provided here:
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > And the discussion thread:
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > >
>> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > > Brian
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > --
>> > > > > > > > > > > > > > > > > -- Guozhang
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to