Hello all, Does anyone else have opinions on the issue of RPC frequency? Would it be better to remove the fetching of non-urgent topics altogether, so that the refreshes are contained in a larger batch?
Thanks, Brian On Mon, Jan 6, 2020 at 2:40 PM Brian Byrne <bby...@confluent.io> wrote: > > So the performance of a metadata RPC that occurs every once every 10 > seconds should not be measured strictly in CPU cost, but rather the effect > on the 95-99%. The larger the request is, the more opportunity there is to > put a burst stress on the producer and broker, and the larger the response > payload to push through the control plane socket. Maybe that's not at 5k > topics, but there are groups that are 10k+ topics and pushing further. > > There's definitely weight to the metadata RPCs. Looking at a previous > local, non-loaded test I ran, I calculate about 2 microseconds per > partition latency to the producer. At 10,000 topics with 100 partitions > each, that's a full 2-second bubble in the best case. I can rerun a more > targeted performance test, but I feel that's missing the point. > > If you're arguing that the batch sizes are too small or time interval too > short, that's fine, and I'll be happy to increase them. However, there is a > breaking point to justify refreshing a subset of a producer's topics. Even > if producers out number brokers by 10x, the fixed overhead cost of an RPC > that occurs every 1-second is probably not worth scraping for performance, > but worth sanitizing. > > Brian > > On Mon, Jan 6, 2020 at 1:31 PM Colin McCabe <cmcc...@apache.org> wrote: > >> On Mon, Jan 6, 2020, at 13:07, Brian Byrne wrote: >> > Hi Colin, >> > >> > Thanks again for the feedback! >> > >> > On Mon, Jan 6, 2020 at 12:07 PM Colin McCabe <cmcc...@apache.org> >> wrote: >> > >> > > Metadata requests don't (always) go to the controller, right? We >> should >> > > fix the wording in this section. >> > > >> > >> > You're correct, s/controller/broker(s)/. >> > >> > I feel like "Proposed Changes" should come before "Public Interfaces" >> > > here. The new configuration won't make sense to the reader until he >> or she >> > > has read the "changes" section. Also, it's not clear from the name >> that >> > > "metadata evict" refers to a span of time. What do you think about " >> > > metadata.eviction.period.ms" as a configuration name? >> > > >> > >> > Sure, makes sense. Updated order and config name. >> > >> > >> > > Where is the "10 seconds" coming from here? The current default for >> > > metadata.max.age.ms is 5 * 60 * 1000 ms, which implies that we want >> to >> > > refresh every 5 minutes. Definitely not every 10 seconds. >> > > >> > >> > The 10 seconds is another arbitrary value to prevent a large number of >> RPCs >> > if the target batch size were fixed at 20. For example, if there's 5,000 >> > topics with a 5-minute interval, then instead of issuing an RPC every >> > 1.2 seconds with batch size of 20, it would issue an RPC every 10 >> seconds >> > with batch size of 167. >> > >> >> Hmm. This will lead to many more RPCs compared to the current situation >> of issuing an RPC every 5 minutes with 5,000 topics, right? See below for >> more discussion. >> >> > >> > >> > > Stepping back a little bit, it seems like the big problem you >> identified >> > > is the O(N^2) behavior of producing to X, then Y, then Z, etc. etc. >> where >> > > each new produce to a fresh topic triggers a metadata request with >> all the >> > > preceding topics included. >> > > >> > > Of course we need to send out a metadata request before producing to >> X, >> > > then Y, then Z, but that request could just specify X, or just >> specify Y, >> > > etc. etc. In other words, we could decouple decouple the routine >> metadata >> > > fetch which happens on a 5 minute timer from the need to fetch >> metadata for >> > > something specific right now. >> > > >> > > I guess my question is, do we really need to allow routine metadata >> > > fetches to "piggyback" on the emergency metadata fetches? It adds a >> lot of >> > > complexity, and we don't have any benchmarks proving that it's better. >> > > Also, as I understand it, whether we piggyback or not, the number of >> > > metadata fetches is the same, right? >> > > >> > >> > So it's possible to do as you suggest, but I would argue that it'd be >> more >> > complex with how the code is structured and wouldn't add any extra >> > complexity. The derived metadata class effectively respond to a >> > notification that a metadata RPC is going to be issued, where they >> return >> > the metadata request structure with topics to refresh, which is >> decoupled >> > from what generated the event (new topic, stale metadata, periodic >> refresh >> > alarm). There is also a strict implementation detail that only one >> metadata >> > request can be outstanding at any time, which lends itself to >> consolidate >> > complexity in the base metadata class and have the derived use the >> "listen >> > for next update" model. >> > >> > By maintaining an ordered list of topics by their last metadata refresh >> > time (0 for new topics), it's a matter of pulling from the front of the >> > list to see which topics should be included in the next update. Always >> > include all urgent topics, then include non-urgent (i.e. need to be >> > refreshed soon-ish) up to the target batch size. >> > >> > The number of metadata fetches could be reduced. Assuming a target batch >> > size of 20, a new topic might also refresh 19 other "refresh soon" >> topics >> > in the same RPC, as opposed to those 19 being handled in a subsequent >> RPC. >> > >> > Although to counter the above, the batching/piggybacking logic isn't >> > necessarily about reducing the total number of RPCs, but rather to >> > distribute the load more evenly over time. For example, if a large >> number >> > of topics need to be refreshed at the approximate same time (common for >> > startups cases that hit a large number of topics), the updates are more >> > evenly distributed to avoid a flood. >> >> It wouldn't be a flood in the current case, right? It would just be a >> single metadata request for a lot of topics. >> >> Let's compare the two cases. In the current scenario, we have 1 metadata >> request every 5 minutes. This request is for 5,000 topics (let's say). In >> the new scenario, we have a request every 10 seconds for 167 topics each. >> >> Which do you think will be more expensive? I think the second scenario >> certainly will because of the overhead of 30x as many requests send over >> the wire. Metadata accesses are now lockless, so the big metadata request >> just isn't that much of a problem. I bet if you benchmark it, sending back >> metadata for 167 topics won't be that much cheaper than sending back >> metadata for 5k. Certainly not 30x cheaper. There will eventually be a >> point where we need to split metadata requests, but it's definitely not at >> 5,000 topics. >> >> regards, >> Colin >> >> >> > >> > Brian >> > >> > >> > >> > > On Mon, Jan 6, 2020, at 10:26, Lucas Bradstreet wrote: >> > > > +1 (non-binding) >> > > > >> > > > On Thu, Jan 2, 2020 at 11:15 AM Brian Byrne <bby...@confluent.io> >> wrote: >> > > > >> > > > > Hello all, >> > > > > >> > > > > After further discussion and improvements, I'd like to reinstate >> the >> > > voting >> > > > > process. >> > > > > >> > > > > The updated KIP: >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-526 >> > > > > %3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics >> > > > > < >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics >> > > > >> > > > > >> > > > > The continued discussion: >> > > > > >> > > > > >> > > >> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E >> > > > > >> > > > > I'd be happy to address any further comments/feedback. >> > > > > >> > > > > Thanks, >> > > > > Brian >> > > > > >> > > > > On Mon, Dec 9, 2019 at 11:02 PM Guozhang Wang <wangg...@gmail.com >> > >> > > wrote: >> > > > > >> > > > > > With the concluded summary on the other discussion thread, I'm >> +1 on >> > > the >> > > > > > proposal. >> > > > > > >> > > > > > Thanks Brian! >> > > > > > >> > > > > > On Tue, Nov 19, 2019 at 8:00 PM deng ziming < >> > > dengziming1...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > > > > >> > > > > > > > For new (uncached) topics, one problem here is that we >> don't know >> > > > > which >> > > > > > > > partition to map a record to in the event that it has a key >> or >> > > custom >> > > > > > > > partitioner, so the RecordAccumulator wouldn't know which >> > > > > batch/broker >> > > > > > it >> > > > > > > > belongs. We'd need an intermediate record queue that >> subsequently >> > > > > moved >> > > > > > > the >> > > > > > > > records into RecordAccumulators once metadata resolution was >> > > > > complete. >> > > > > > > For >> > > > > > > > known topics, we don't currently block at all in >> waitOnMetadata. >> > > > > > > > >> > > > > > > >> > > > > > > You are right, I forget this fact, and the intermediate record >> > > queue >> > > > > will >> > > > > > > help, but I have some questions >> > > > > > > >> > > > > > > if we add an intermediate record queue in KafkaProducer, when >> > > should we >> > > > > > > move the records into RecordAccumulators? >> > > > > > > only NetworkClient is aware of the MetadataResponse, here is >> the >> > > > > > > hierarchical structure of the related classes: >> > > > > > > KafkaProducer >> > > > > > > Accumulator >> > > > > > > Sender >> > > > > > > NetworkClient >> > > > > > > metadataUpdater.handleCompletedMetadataResponse >> > > > > > > >> > > > > > > so >> > > > > > > 1. we should also add a metadataUpdater to KafkaProducer? >> > > > > > > 2. if the topic really does not exists? the intermediate >> record >> > > queue >> > > > > > will >> > > > > > > become too large? >> > > > > > > 3. and should we `block` when the intermediate record queue >> is too >> > > > > large? >> > > > > > > and this will again bring the blocking problem? >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Wed, Nov 20, 2019 at 12:40 AM Brian Byrne < >> bby...@confluent.io> >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hi Deng, >> > > > > > > > >> > > > > > > > Thanks for the feedback. >> > > > > > > > >> > > > > > > > On Mon, Nov 18, 2019 at 6:56 PM deng ziming < >> > > > > dengziming1...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > hi, I reviewed the current code, the ProduceMetadata >> maintains >> > > an >> > > > > > > expiry >> > > > > > > > > threshold for every topic, every time when we write to a >> topic >> > > we >> > > > > > will >> > > > > > > > set >> > > > > > > > > the expiry time to -1 to indicate it should be updated, >> this >> > > does >> > > > > > work >> > > > > > > to >> > > > > > > > > reduce the size of the topic working set, but the >> producer will >> > > > > > > continue >> > > > > > > > > fetching metadata for these topics in every metadata >> request >> > > for >> > > > > the >> > > > > > > full >> > > > > > > > > expiry duration. >> > > > > > > > > >> > > > > > > > >> > > > > > > > Indeed, you are correct, I terribly misread the code here. >> > > > > Fortunately >> > > > > > > this >> > > > > > > > was only a minor optimization in the KIP that's no longer >> > > necessary. >> > > > > > > > >> > > > > > > > >> > > > > > > > and we can improve the situation by 2 means: >> > > > > > > > > 1. we maintain a refresh threshold for every topic >> which >> > > is for >> > > > > > > > example >> > > > > > > > > 0.8 * expiry_threshold, and when we send >> `MetadataRequest` to >> > > > > brokers >> > > > > > > we >> > > > > > > > > just request unknownLeaderTopics + unknownPartitionTopics >> + >> > > topics >> > > > > > > > > reach refresh threshold. >> > > > > > > > > >> > > > > > > > >> > > > > > > > Right, this is similar to what I suggested, with a larger >> window >> > > on >> > > > > the >> > > > > > > > "staleness" that permits for batching to an appropriate size >> > > (except >> > > > > if >> > > > > > > > there's any unknown topics, you'd want to issue the request >> > > > > > immediately). >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > 2. we don't invoke KafkaProducer#waitOnMetadata when >> we >> > > call >> > > > > > > > > KafkaProducer#send because of we just send data to >> > > > > RecordAccumulator, >> > > > > > > and >> > > > > > > > > before we send data to brokers we will invoke >> > > > > > > RecordAccumulator#ready(), >> > > > > > > > so >> > > > > > > > > we can only invoke waitOnMetadata to block when (number >> topics >> > > > > > > > > reach refresh threshold)>(number of all known topics)*0.2. >> > > > > > > > > >> > > > > > > > >> > > > > > > > For new (uncached) topics, one problem here is that we >> don't know >> > > > > which >> > > > > > > > partition to map a record to in the event that it has a key >> or >> > > custom >> > > > > > > > partitioner, so the RecordAccumulator wouldn't know which >> > > > > batch/broker >> > > > > > it >> > > > > > > > belongs. We'd need an intermediate record queue that >> subsequently >> > > > > moved >> > > > > > > the >> > > > > > > > records into RecordAccumulators once metadata resolution was >> > > > > complete. >> > > > > > > For >> > > > > > > > known topics, we don't currently block at all in >> waitOnMetadata. >> > > > > > > > >> > > > > > > > The last major point of minimizing producer startup metadata >> > > RPCs may >> > > > > > > still >> > > > > > > > need to be improved, but this would be a large improvement >> on the >> > > > > > current >> > > > > > > > situation. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Brian >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > I think the above 2 ways are enough to solve the current >> > > problem. >> > > > > > > > > >> > > > > > > > > On Tue, Nov 19, 2019 at 3:20 AM Colin McCabe < >> > > cmcc...@apache.org> >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > On Mon, Nov 18, 2019, at 10:05, Brian Byrne wrote: >> > > > > > > > > > > On Fri, Nov 15, 2019 at 5:08 PM Colin McCabe < >> > > > > cmcc...@apache.org >> > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Two seconds doesn't seem like a reasonable amount of >> > > time to >> > > > > > > leave >> > > > > > > > > for >> > > > > > > > > > the >> > > > > > > > > > > > metadata fetch. Fetching halfway through the >> expiration >> > > > > period >> > > > > > > > seems >> > > > > > > > > > more >> > > > > > > > > > > > reasonable. It also doesn't require us to create a >> new >> > > > > > > > configuration >> > > > > > > > > > key, >> > > > > > > > > > > > which is nice. >> > > > > > > > > > > > >> > > > > > > > > > > > Another option is to just do the metadata fetch >> every >> > > > > > > > > > metadata.max.age.ms, >> > > > > > > > > > > > but not expire the topic until we can't fetch the >> > > metadata >> > > > > for >> > > > > > 2 >> > > > > > > * >> > > > > > > > > > > > metadata.max.age.ms. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > I'd expect two seconds to be reasonable in the common >> case. >> > > > > Keep >> > > > > > in >> > > > > > > > > mind >> > > > > > > > > > > that this doesn't affect correctness, and a control >> > > operation >> > > > > > > > returning >> > > > > > > > > > > cached metadata should be on the order of >> milliseconds. >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Hi Brian, >> > > > > > > > > > >> > > > > > > > > > Thanks again for the KIP. >> > > > > > > > > > >> > > > > > > > > > I think the issue here is not the common case, but the >> > > uncommon >> > > > > > case >> > > > > > > > > where >> > > > > > > > > > the metadata fetch takes longer than expected. In that >> > > case, we >> > > > > > > don't >> > > > > > > > > want >> > > > > > > > > > to be in the position of having our metadata expire >> because >> > > we >> > > > > > waited >> > > > > > > > too >> > > > > > > > > > long to renew it. >> > > > > > > > > > >> > > > > > > > > > This is one reason why I think that the metadata >> expiration >> > > time >> > > > > > > should >> > > > > > > > > be >> > > > > > > > > > longer than the metadata refresh time. In fact, it >> might be >> > > > > worth >> > > > > > > > having >> > > > > > > > > > two separate configuration keys for these two values. I >> > > could >> > > > > > > imagine >> > > > > > > > a >> > > > > > > > > > user who is having trouble with metadata expiration >> wanting >> > > to >> > > > > > > increase >> > > > > > > > > the >> > > > > > > > > > metadata expiration time, but without increasing the >> metadata >> > > > > > refresh >> > > > > > > > > > period. In a sense, the metadata expiration time is >> like >> > > the ZK >> > > > > > > > session >> > > > > > > > > > expiration time. You might want to turn it up if the >> > > cluster is >> > > > > > > > > > experiencing load spikes. >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > But to the general >> > > > > > > > > > > point, defining the algorithm would mean enforcing it >> to >> > > fair >> > > > > > > > accuracy, >> > > > > > > > > > > whereas if the suggestion is that it'll be performed >> at a >> > > > > > > reasonable >> > > > > > > > > > time, >> > > > > > > > > > > it allows for batching and other optimizations. >> Perhaps I >> > > > > > shouldn't >> > > > > > > > be >> > > > > > > > > > > regarding what's defined in a KIP to be contractual in >> > > these >> > > > > > cases, >> > > > > > > > but >> > > > > > > > > > you >> > > > > > > > > > > could consider a first implementation to collect >> topics >> > > whose >> > > > > > > > metadata >> > > > > > > > > > has >> > > > > > > > > > > exceeded (metadata.max.age.ms / 2), and sending the >> batch >> > > > > once a >> > > > > > > > > > > constituent topic's metadata is near the expiry, or a >> > > > > sufficient >> > > > > > > > number >> > > > > > > > > > of >> > > > > > > > > > > topics have been collected (10? 100? 1000?). >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > I'm concerned that if we change the metadata caching >> strategy >> > > > > > without >> > > > > > > > > > discussing it first, it may improve certain workloads >> but >> > > make >> > > > > > others >> > > > > > > > > > worse. We need to be concrete about what the proposed >> > > strategy >> > > > > is >> > > > > > so >> > > > > > > > > that >> > > > > > > > > > we can really evaluate it. >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > We should be specific about what happens if the >> first few >> > > > > > > metadata >> > > > > > > > > > fetches >> > > > > > > > > > > > fail. Do we use exponential backoff to decide when >> to >> > > > > resend? >> > > > > > > It >> > > > > > > > > > seems >> > > > > > > > > > > > like we really should, for all the usual reasons >> (reduce >> > > the >> > > > > > load >> > > > > > > > on >> > > > > > > > > > > > brokers, ride out temporary service disruptions, >> etc.) >> > > Maybe >> > > > > > we >> > > > > > > > > could >> > > > > > > > > > have >> > > > > > > > > > > > an exponential retry backoff for each broker (in >> other >> > > words, >> > > > > > we >> > > > > > > > > > should try >> > > > > > > > > > > > to contact a different broker before applying the >> > > backoff.) >> > > > > I >> > > > > > > > think >> > > > > > > > > > this >> > > > > > > > > > > > already sort of happens with the disconnect >> timeout, but >> > > we >> > > > > > might >> > > > > > > > > need >> > > > > > > > > > a >> > > > > > > > > > > > more general solution. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > I don't plan to change this behavior. Currently it >> retries >> > > > > after >> > > > > > a >> > > > > > > > > fixed >> > > > > > > > > > > value of 'retry.backoff.ms' (defaults to 100 ms). >> It's >> > > > > possible >> > > > > > > that >> > > > > > > > > > > different brokers are attempted, but I haven't dug >> into it. >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > I think it's critical to understand what the current >> > > behavior is >> > > > > > > before >> > > > > > > > > we >> > > > > > > > > > try to change it. The difference between retrying the >> same >> > > > > broker >> > > > > > > and >> > > > > > > > > > trying a different one has a large impact it has on >> cluster >> > > load >> > > > > > and >> > > > > > > > > > latency. For what it's worth, I believe the behavior >> is the >> > > > > second >> > > > > > > > one, >> > > > > > > > > > but it has been a while since I checked. Let's figure >> this >> > > out. >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > Thanks for the clarification. Fully asynchronous >> is the >> > > way >> > > > > to >> > > > > > > > go, I >> > > > > > > > > > > > agree. I'm having trouble understanding how >> timeouts are >> > > > > > handled >> > > > > > > > in >> > > > > > > > > > the >> > > > > > > > > > > > KIP. It seems like if we can't fetch the metadata >> > > within the >> > > > > > > > > > designated >> > > > > > > > > > > > metadata timeout, the future / callback should >> receive a >> > > > > > > > > > TimeoutException >> > > > > > > > > > > > right? We do not want the send call to be deferred >> > > forever >> > > > > if >> > > > > > > > > metadata >> > > > > > > > > > > > can't be fetched. Eventually it should fail if it >> can't >> > > be >> > > > > > > > > performed. >> > > > > > > > > > > > >> > > > > > > > > > > > I do think this is something that will have to be >> > > mentioned >> > > > > in >> > > > > > > the >> > > > > > > > > > > > compatibility section. There is some code out there >> > > that is >> > > > > > > > probably >> > > > > > > > > > > > prepared to handle a timeout exception from the send >> > > > > function, >> > > > > > > > which >> > > > > > > > > > may >> > > > > > > > > > > > need to be updated to check for a timeout from the >> > > future or >> > > > > > > > > callback. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Correct, a timeout exception would be delivered in the >> > > future. >> > > > > > > Sure, >> > > > > > > > I >> > > > > > > > > > can >> > > > > > > > > > > add that note to the KIP. >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Thanks. >> > > > > > > > > > >> > > > > > > > > > best, >> > > > > > > > > > Colin >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > It seems like this is an existing problem. You may >> fire >> > > off >> > > > > a >> > > > > > > lot >> > > > > > > > of >> > > > > > > > > > send >> > > > > > > > > > > > calls that get blocked because the broker that is >> the >> > > leader >> > > > > > for >> > > > > > > a >> > > > > > > > > > certain >> > > > > > > > > > > > partition is not responding. I'm not sure that we >> need >> > > to do >> > > > > > > > > anything >> > > > > > > > > > > > special here. On the other hand, we could make the >> case >> > > for >> > > > > a >> > > > > > > > > generic >> > > > > > > > > > "max >> > > > > > > > > > > > number of outstanding sends" configuration to >> prevent >> > > > > surprise >> > > > > > > OOMs >> > > > > > > > > in >> > > > > > > > > > the >> > > > > > > > > > > > existing cases, plus the new one we're adding. But >> this >> > > > > feels >> > > > > > > > like a >> > > > > > > > > > bit >> > > > > > > > > > > > of a scope expansion. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Right, this is an existing problem, however the >> > > asynchronous >> > > > > send >> > > > > > > > could >> > > > > > > > > > > cause unexpected behavior. For example, if a client >> pinned >> > > > > > > > > > > topics/partitions to individual send threads, then >> memory >> > > > > > couldn't >> > > > > > > be >> > > > > > > > > > > exhausted by a single topic since a blocking send >> would >> > > prevent >> > > > > > > > further >> > > > > > > > > > > records from being buffered on that topic. The >> compromise >> > > could >> > > > > > be >> > > > > > > > that >> > > > > > > > > > we >> > > > > > > > > > > only ever permit one outstanding record batch for a >> topic, >> > > > > which >> > > > > > > > would >> > > > > > > > > > keep >> > > > > > > > > > > the code simple and wouldn't permit a single topic to >> > > consume >> > > > > all >> > > > > > > > > > available >> > > > > > > > > > > memory. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > They may be connected, but I'm not sure they should >> be >> > > the >> > > > > > same. >> > > > > > > > > > Perhaps >> > > > > > > > > > > > expiry should be 4x the typical fetch rate, for >> example. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > That's true. You could also make the case for a faster >> > > expiry >> > > > > > than >> > > > > > > > > > refresh >> > > > > > > > > > > as well. Makes sense to separate this out. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > Hmm.... are you sure this is an N^2 problem? If you >> > > have T1 >> > > > > > and >> > > > > > > > T2, >> > > > > > > > > > you >> > > > > > > > > > > > fetch metadata for T1 and T2, right? I guess you >> could >> > > argue >> > > > > > > that >> > > > > > > > we >> > > > > > > > > > often >> > > > > > > > > > > > fetch metadata for partitions we don't care about, >> but >> > > that >> > > > > > > doesn't >> > > > > > > > > > make it >> > > > > > > > > > > > O(N^2). Maybe there's something about the >> implementation >> > > > > that >> > > > > > > I'm >> > > > > > > > > > missing. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > My apologies, I left out the context. One issue the >> KIP is >> > > > > trying >> > > > > > > to >> > > > > > > > > > > resolve is the metadata storm that's caused by >> producers >> > > > > starting >> > > > > > > up. >> > > > > > > > > In >> > > > > > > > > > > the worst case, where the send call is only performed >> from >> > > a >> > > > > > single >> > > > > > > > > > thread >> > > > > > > > > > > (i.e. no possible batching), fetching metadata for 1K >> > > topics >> > > > > will >> > > > > > > > > > generate >> > > > > > > > > > > 1K RPCs, with payload 1+2+...+1K topics' metadata. >> Being >> > > smart >> > > > > > > about >> > > > > > > > > the >> > > > > > > > > > > topics being refreshed would still generate 1K RPCs >> for 1 >> > > topic >> > > > > > > each, >> > > > > > > > > and >> > > > > > > > > > > asynchronous behavior would permit batching (note >> > > steady-state >> > > > > > > > > refreshing >> > > > > > > > > > > doesn't require the asynchronous behavior to batch). >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > In general, we need to take advantage of batching >> to do >> > > this >> > > > > > well >> > > > > > > > > (one >> > > > > > > > > > > > reason why I think we should steer clear of >> > > ultra-granular >> > > > > > > timeout >> > > > > > > > > > > > tracking). It's best to do 1 RPC asking for 10 >> topics, >> > > not >> > > > > 10 >> > > > > > > RPCs >> > > > > > > > > > asking >> > > > > > > > > > > > for a single topic each, even if that means some of >> the >> > > topic >> > > > > > > > > timeouts >> > > > > > > > > > are >> > > > > > > > > > > > not *exactly* aligned with the configured value. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Absolutely, agreed. >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > Brian >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > best, >> > > > > > > > > > > > Colin >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > Brian >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Mon, Nov 11, 2019 at 11:47 AM Colin McCabe < >> > > > > > > > cmcc...@apache.org> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi Brian, >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Thanks for the KIP. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Starting the metadata fetch before we need the >> > > result is >> > > > > > > > > > definitely a >> > > > > > > > > > > > > > great idea. This way, the metadata fetch can be >> > > done in >> > > > > > > > parallel >> > > > > > > > > > with >> > > > > > > > > > > > all >> > > > > > > > > > > > > > the other stuff e producer is doing, rather than >> > > forcing >> > > > > > the >> > > > > > > > > > producer >> > > > > > > > > > > > to >> > > > > > > > > > > > > > periodically come to a halt periodically while >> > > metadata >> > > > > is >> > > > > > > > > fetched. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Maybe I missed it, but there seemed to be some >> > > details >> > > > > > > missing >> > > > > > > > > > here. >> > > > > > > > > > > > When >> > > > > > > > > > > > > > do we start the metadata fetch? For example, if >> > > topic >> > > > > > > metadata >> > > > > > > > > > expires >> > > > > > > > > > > > > > every 5 minutes, perhaps we should wait 4 >> minutes, >> > > then >> > > > > > > > starting >> > > > > > > > > > > > fetching >> > > > > > > > > > > > > > the new metadata, which we would expect to >> arrive by >> > > the >> > > > > 5 >> > > > > > > > minute >> > > > > > > > > > > > > > deadline. Or perhaps we should start the fetch >> even >> > > > > > earlier, >> > > > > > > > > > around >> > > > > > > > > > > > the >> > > > > > > > > > > > > > 2.5 minute mark. In any case, there should be >> some >> > > > > > > discussion >> > > > > > > > > > about >> > > > > > > > > > > > what >> > > > > > > > > > > > > > the actual policy is. Given that >> > > metadata.max.age.ms is >> > > > > > > > > > configurable, >> > > > > > > > > > > > > > maybe that policy needs to be expressed in >> terms of >> > > a >> > > > > > > > percentage >> > > > > > > > > > of >> > > > > > > > > > > > the >> > > > > > > > > > > > > > refresh period rather than in terms of an >> absolute >> > > delay. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The KIP correctly points out that the current >> > > metadata >> > > > > > > fetching >> > > > > > > > > > policy >> > > > > > > > > > > > > > causes us to "[block] in a function that's >> > > advertised as >> > > > > > > > > > asynchronous." >> > > > > > > > > > > > > > However, the KIP doesn't seem to spell out >> whether we >> > > > > will >> > > > > > > > > > continue to >> > > > > > > > > > > > > > block if metadata can't be found, or if this >> will be >> > > > > > > abolished. >> > > > > > > > > > > > Clearly, >> > > > > > > > > > > > > > starting the metadata fetch early will reduce >> > > blocking in >> > > > > > the >> > > > > > > > > > common >> > > > > > > > > > > > case, >> > > > > > > > > > > > > > but will there still be blocking in the >> uncommon case >> > > > > where >> > > > > > > the >> > > > > > > > > > early >> > > > > > > > > > > > fetch >> > > > > > > > > > > > > > doesn't succeed in time? >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > To address (2), the producer currently >> maintains >> > > an >> > > > > > expiry >> > > > > > > > > > threshold >> > > > > > > > > > > > > > for >> > > > > > > > > > > > > > > every topic, which is used to remove a topic >> from >> > > the >> > > > > > > > working >> > > > > > > > > > set >> > > > > > > > > > > > at a >> > > > > > > > > > > > > > > future time (currently hard-coded to 5 >> minutes, >> > > this >> > > > > > > should >> > > > > > > > be >> > > > > > > > > > > > modified >> > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > use metadata.max.age.ms). While this does >> work to >> > > > > > reduce >> > > > > > > > the >> > > > > > > > > > size >> > > > > > > > > > > > of >> > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > topic working set, the producer will continue >> > > fetching >> > > > > > > > > metadata >> > > > > > > > > > for >> > > > > > > > > > > > > > these >> > > > > > > > > > > > > > > topics in every metadata request for the full >> > > expiry >> > > > > > > > duration. >> > > > > > > > > > This >> > > > > > > > > > > > > > logic >> > > > > > > > > > > > > > > can be made more intelligent by managing the >> > > expiry >> > > > > from >> > > > > > > > when >> > > > > > > > > > the >> > > > > > > > > > > > topic >> > > > > > > > > > > > > > > was last used, enabling the expiry duration >> to be >> > > > > > reduced >> > > > > > > to >> > > > > > > > > > improve >> > > > > > > > > > > > > > cases >> > > > > > > > > > > > > > > where a large number of topics are touched >> > > > > > intermittently. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Can you clarify this part a bit? It seems like >> we >> > > have a >> > > > > > > > > metadata >> > > > > > > > > > > > > > expiration policy now for topics, and we will >> have >> > > one >> > > > > > after >> > > > > > > > this >> > > > > > > > > > KIP, >> > > > > > > > > > > > but >> > > > > > > > > > > > > > they will be somewhat different. But it's not >> clear >> > > to >> > > > > me >> > > > > > > what >> > > > > > > > > the >> > > > > > > > > > > > > > differences are. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > In general, if load is a problem, we should >> probably >> > > > > > consider >> > > > > > > > > > adding >> > > > > > > > > > > > some >> > > > > > > > > > > > > > kind of jitter on the client side. There are >> > > definitely >> > > > > > > cases >> > > > > > > > > > where >> > > > > > > > > > > > people >> > > > > > > > > > > > > > start up a lot of clients at the same time in >> > > parallel >> > > > > and >> > > > > > > > there >> > > > > > > > > > is a >> > > > > > > > > > > > > > thundering herd problem with metadata updates. >> > > Adding >> > > > > > jitter >> > > > > > > > > would >> > > > > > > > > > > > spread >> > > > > > > > > > > > > > the load across time rather than creating a >> spike >> > > every 5 >> > > > > > > > minutes >> > > > > > > > > > in >> > > > > > > > > > > > this >> > > > > > > > > > > > > > case. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > best, >> > > > > > > > > > > > > > Colin >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Fri, Nov 8, 2019, at 08:59, Ismael Juma >> wrote: >> > > > > > > > > > > > > > > I think this KIP affects when we block which >> is >> > > > > actually >> > > > > > > user >> > > > > > > > > > visible >> > > > > > > > > > > > > > > behavior. Right? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Ismael >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Fri, Nov 8, 2019, 8:50 AM Brian Byrne < >> > > > > > > > bby...@confluent.io> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Hi Guozhang, >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Regarding metadata expiry, no access times >> other >> > > than >> > > > > > the >> > > > > > > > > > initial >> > > > > > > > > > > > > > lookup[1] >> > > > > > > > > > > > > > > > are used for determining when to expire >> producer >> > > > > > > metadata. >> > > > > > > > > > > > Therefore, >> > > > > > > > > > > > > > > > frequently used topics' metadata will be >> aged >> > > out and >> > > > > > > > > > subsequently >> > > > > > > > > > > > > > > > refreshed (in a blocking manner) every five >> > > minutes, >> > > > > > and >> > > > > > > > > > > > infrequently >> > > > > > > > > > > > > > used >> > > > > > > > > > > > > > > > topics will be retained for a minimum of >> five >> > > minutes >> > > > > > and >> > > > > > > > > > currently >> > > > > > > > > > > > > > > > refetched on every metadata update during >> that >> > > time >> > > > > > > period. >> > > > > > > > > The >> > > > > > > > > > > > > > sentence is >> > > > > > > > > > > > > > > > suggesting that we could reduce the expiry >> time >> > > to >> > > > > > > improve >> > > > > > > > > the >> > > > > > > > > > > > latter >> > > > > > > > > > > > > > > > without affecting (rather slightly >> improving) the >> > > > > > former. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Keep in mind that in most all cases, I >> wouldn't >> > > > > > > anticipate >> > > > > > > > > > much of >> > > > > > > > > > > > a >> > > > > > > > > > > > > > > > difference with producer behavior, and the >> extra >> > > > > logic >> > > > > > > can >> > > > > > > > be >> > > > > > > > > > > > > > implemented >> > > > > > > > > > > > > > > > to have insignificant cost. It's the >> > > large/dynamic >> > > > > > topic >> > > > > > > > > corner >> > > > > > > > > > > > cases >> > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > we're trying to improve. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > It'd be convenient if the KIP is no longer >> > > necessary. >> > > > > > > > You're >> > > > > > > > > > right >> > > > > > > > > > > > in >> > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > there's no public API changes and the >> behavioral >> > > > > > changes >> > > > > > > > are >> > > > > > > > > > > > entirely >> > > > > > > > > > > > > > > > internal. I'd be happy to continue the >> discussion >> > > > > > around >> > > > > > > > the >> > > > > > > > > > KIP, >> > > > > > > > > > > > but >> > > > > > > > > > > > > > > > unless otherwise objected, it can be >> retired. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > [1] Not entirely accurate, it's actually the >> > > first >> > > > > time >> > > > > > > > when >> > > > > > > > > > the >> > > > > > > > > > > > client >> > > > > > > > > > > > > > > > calculates whether to retain the topic in >> its >> > > > > metadata. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > Brian >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Thu, Nov 7, 2019 at 4:48 PM Guozhang >> Wang < >> > > > > > > > > > wangg...@gmail.com> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Hello Brian, >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Could you elaborate a bit more on this >> > > sentence: >> > > > > > "This >> > > > > > > > > logic >> > > > > > > > > > can >> > > > > > > > > > > > be >> > > > > > > > > > > > > > made >> > > > > > > > > > > > > > > > > more intelligent by managing the expiry >> from >> > > when >> > > > > the >> > > > > > > > topic >> > > > > > > > > > was >> > > > > > > > > > > > last >> > > > > > > > > > > > > > > > used, >> > > > > > > > > > > > > > > > > enabling the expiry duration to be >> reduced to >> > > > > improve >> > > > > > > > cases >> > > > > > > > > > > > where a >> > > > > > > > > > > > > > large >> > > > > > > > > > > > > > > > > number of topics are touched >> intermittently." >> > > Not >> > > > > > sure >> > > > > > > I >> > > > > > > > > > fully >> > > > > > > > > > > > > > understand >> > > > > > > > > > > > > > > > > the proposal. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Also since now this KIP did not make any >> > > public API >> > > > > > > > changes >> > > > > > > > > > and >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > behavioral changes are not considered a >> public >> > > API >> > > > > > > > contract >> > > > > > > > > > (i.e. >> > > > > > > > > > > > > > how we >> > > > > > > > > > > > > > > > > maintain the topic metadata in producer >> cache >> > > is >> > > > > > never >> > > > > > > > > > committed >> > > > > > > > > > > > to >> > > > > > > > > > > > > > > > users), >> > > > > > > > > > > > > > > > > I wonder if we still need a KIP for the >> > > proposed >> > > > > > change >> > > > > > > > any >> > > > > > > > > > more? >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Guozhang >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Thu, Nov 7, 2019 at 12:43 PM Brian >> Byrne < >> > > > > > > > > > bby...@confluent.io >> > > > > > > > > > > > > >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Hello all, >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > I'd like to propose a vote for a >> producer >> > > change >> > > > > to >> > > > > > > > > improve >> > > > > > > > > > > > > > producer >> > > > > > > > > > > > > > > > > > behavior when dealing with a large >> number of >> > > > > > topics, >> > > > > > > in >> > > > > > > > > > part by >> > > > > > > > > > > > > > > > reducing >> > > > > > > > > > > > > > > > > > the amount of metadata fetching >> performed. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > The full KIP is provided here: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-526%3A+Reduce+Producer+Metadata+Lookups+for+Large+Number+of+Topics >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > And the discussion thread: >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > >> https://lists.apache.org/thread.html/b2f8f830ef04587144cf0840c7d4811bbf0a14f3c459723dbc5acf9e@%3Cdev.kafka.apache.org%3E >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > Brian >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > -- >> > > > > > > > > > > > > > > > > -- Guozhang >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > -- Guozhang >> > > > > > >> > > > > >> > > > >> > > >> > >> >