Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Sorry I missed that comment on the thread. Proposal looks great, thanks, Abhijeet! On Sat, 16 Mar 2024 at 13:19, Abhijeet Kumar wrote: > Hi Jorge, > > The configs name was chosen to keep it consistent with the other existing > quota configs, such as > *replica.alter.log.dirs.io.max.bytes.per.second* as pointed out by Jun in > the thread. > > Also, we can revisit the names of the components during implementation, > since those are not exposed to the user. > > Please let me know if you have any further concerns. > > Regards, > Abhijeet. > > > > On Mon, Mar 11, 2024 at 6:11 PM Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> wrote: > > > Hi Abhijeet, > > > > Thanks for the KIP! Looks good to me. I just have a minor comments on > > naming: > > > > Would it be work to align the config names to existing quota names? > > e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of > > `remote.log.manager.copy.max.bytes.per.second`? > > > > Same for new components, could we use the same verbs as in the configs: > > - RLMCopyQuotaManager > > - RLMFetchQuotaManager > > > > > > On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar > > wrote: > > > > > Thank you all for your comments. As all the comments in the thread are > > > addressed, I am starting a Vote thread for the KIP. Please have a look. > > > > > > Regards. > > > > > > On Thu, Mar 7, 2024 at 12:34 PM Luke Chen wrote: > > > > > > > Hi Abhijeet, > > > > > > > > Thanks for the update and the explanation. > > > > I had another look, and it LGTM now! > > > > > > > > Thanks. > > > > Luke > > > > > > > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao > > wrote: > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > Thanks for the reply. Sounds good to me. > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar < > > > > abhijeet.cse@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > Thanks for pointing it out. It makes sense to me. We can have the > > > > > following > > > > > > metrics instead. What do you think? > > > > > > > > > > > >- remote-(fetch|copy)-throttle-time-avg (The average time in > ms > > > > remote > > > > > >fetches/copies was throttled by a broker) > > > > > >- remote-(fetch|copy)-throttle-time--max (The maximum time in > ms > > > > > remote > > > > > >fetches/copies was throttled by a broker) > > > > > > > > > > > > These are similar to fetch-throttle-time-avg and > > > > fetch-throttle-time-max > > > > > > metrics we have for Kafak Consumers? > > > > > > The Avg and Max are computed over the (sliding) window as defined > > by > > > > the > > > > > > configuration metrics.sample.window.ms on the server. > > > > > > > > > > > > (Also, I will update the config and metric names to be > consistent) > > > > > > > > > > > > Regards. > > > > > > > > > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao > > > > > > wrote: > > > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > The issue with recording the throttle time as a gauge is that > > it's > > > > > > > transient. If the metric is not read immediately, the recorded > > > value > > > > > > could > > > > > > > be reset to 0. The admin won't realize that throttling has > > > happened. > > > > > > > > > > > > > > For client quotas, the throttle time is tracked as the average > > > > > > > throttle-time per user/client-id. This makes the metric less > > > > transient. > > > > > > > > > > > > > > Also, the configs use read/write whereas the metrics use > > > fetch/copy. > > > > > > Could > > > > > > > we make them consistent? > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar < > > > > > > abhijeet.cse@gmail.com > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > Clarified the meaning of the two metrics. Also updated the > KIP. > > > > > > > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > > > name=RemoteFetchThrottleTime > > > > > -> > > > > > > > The > > > > > > > > duration of time required at a given moment to bring the > > observed > > > > > fetch > > > > > > > > rate within the allowed limit, by preventing further reads. > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > > name=RemoteCopyThrottleTime > > > > > -> > > > > > > > The > > > > > > > > duration of time required at a given moment to bring the > > observed > > > > > > remote > > > > > > > > copy rate within the allowed limit, by preventing further > > copies. > > > > > > > > > > > > > > > > Regards. > > > > > > > > > > > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > > > > > > > > > > > > > Just a minor comment. Could you document the exact meaning > of > > >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Jorge, The configs name was chosen to keep it consistent with the other existing quota configs, such as *replica.alter.log.dirs.io.max.bytes.per.second* as pointed out by Jun in the thread. Also, we can revisit the names of the components during implementation, since those are not exposed to the user. Please let me know if you have any further concerns. Regards, Abhijeet. On Mon, Mar 11, 2024 at 6:11 PM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Hi Abhijeet, > > Thanks for the KIP! Looks good to me. I just have a minor comments on > naming: > > Would it be work to align the config names to existing quota names? > e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of > `remote.log.manager.copy.max.bytes.per.second`? > > Same for new components, could we use the same verbs as in the configs: > - RLMCopyQuotaManager > - RLMFetchQuotaManager > > > On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar > wrote: > > > Thank you all for your comments. As all the comments in the thread are > > addressed, I am starting a Vote thread for the KIP. Please have a look. > > > > Regards. > > > > On Thu, Mar 7, 2024 at 12:34 PM Luke Chen wrote: > > > > > Hi Abhijeet, > > > > > > Thanks for the update and the explanation. > > > I had another look, and it LGTM now! > > > > > > Thanks. > > > Luke > > > > > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao > wrote: > > > > > > > Hi, Abhijeet, > > > > > > > > Thanks for the reply. Sounds good to me. > > > > > > > > Jun > > > > > > > > > > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar < > > > abhijeet.cse@gmail.com> > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Thanks for pointing it out. It makes sense to me. We can have the > > > > following > > > > > metrics instead. What do you think? > > > > > > > > > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms > > > remote > > > > >fetches/copies was throttled by a broker) > > > > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms > > > > remote > > > > >fetches/copies was throttled by a broker) > > > > > > > > > > These are similar to fetch-throttle-time-avg and > > > fetch-throttle-time-max > > > > > metrics we have for Kafak Consumers? > > > > > The Avg and Max are computed over the (sliding) window as defined > by > > > the > > > > > configuration metrics.sample.window.ms on the server. > > > > > > > > > > (Also, I will update the config and metric names to be consistent) > > > > > > > > > > Regards. > > > > > > > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao > > > > wrote: > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > The issue with recording the throttle time as a gauge is that > it's > > > > > > transient. If the metric is not read immediately, the recorded > > value > > > > > could > > > > > > be reset to 0. The admin won't realize that throttling has > > happened. > > > > > > > > > > > > For client quotas, the throttle time is tracked as the average > > > > > > throttle-time per user/client-id. This makes the metric less > > > transient. > > > > > > > > > > > > Also, the configs use read/write whereas the metrics use > > fetch/copy. > > > > > Could > > > > > > we make them consistent? > > > > > > > > > > > > Jun > > > > > > > > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar < > > > > > abhijeet.cse@gmail.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > Clarified the meaning of the two metrics. Also updated the KIP. > > > > > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > > name=RemoteFetchThrottleTime > > > > -> > > > > > > The > > > > > > > duration of time required at a given moment to bring the > observed > > > > fetch > > > > > > > rate within the allowed limit, by preventing further reads. > > > > > > > kafka.log.remote:type=RemoteLogManager, > > name=RemoteCopyThrottleTime > > > > -> > > > > > > The > > > > > > > duration of time required at a given moment to bring the > observed > > > > > remote > > > > > > > copy rate within the allowed limit, by preventing further > copies. > > > > > > > > > > > > > > Regards. > > > > > > > > > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > > > > > > > > > > > Just a minor comment. Could you document the exact meaning of > > the > > > > > > > following > > > > > > > > two metrics? For example, is the time accumulated? If so, is > it > > > > from > > > > > > the > > > > > > > > start of the broker or over some window? > > > > > > > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > > > name=RemoteFetchThrottleTime > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > > name=RemoteCopyThrottleTime > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Abhijeet, Thanks for the KIP! Looks good to me. I just have a minor comments on naming: Would it be work to align the config names to existing quota names? e.g. `remote.log.manager.copy.byte.rate.quota` (or similar) instead of `remote.log.manager.copy.max.bytes.per.second`? Same for new components, could we use the same verbs as in the configs: - RLMCopyQuotaManager - RLMFetchQuotaManager On Fri, 8 Mar 2024 at 13:43, Abhijeet Kumar wrote: > Thank you all for your comments. As all the comments in the thread are > addressed, I am starting a Vote thread for the KIP. Please have a look. > > Regards. > > On Thu, Mar 7, 2024 at 12:34 PM Luke Chen wrote: > > > Hi Abhijeet, > > > > Thanks for the update and the explanation. > > I had another look, and it LGTM now! > > > > Thanks. > > Luke > > > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao wrote: > > > > > Hi, Abhijeet, > > > > > > Thanks for the reply. Sounds good to me. > > > > > > Jun > > > > > > > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar < > > abhijeet.cse@gmail.com> > > > wrote: > > > > > > > Hi Jun, > > > > > > > > Thanks for pointing it out. It makes sense to me. We can have the > > > following > > > > metrics instead. What do you think? > > > > > > > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms > > remote > > > >fetches/copies was throttled by a broker) > > > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms > > > remote > > > >fetches/copies was throttled by a broker) > > > > > > > > These are similar to fetch-throttle-time-avg and > > fetch-throttle-time-max > > > > metrics we have for Kafak Consumers? > > > > The Avg and Max are computed over the (sliding) window as defined by > > the > > > > configuration metrics.sample.window.ms on the server. > > > > > > > > (Also, I will update the config and metric names to be consistent) > > > > > > > > Regards. > > > > > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao > > > wrote: > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > The issue with recording the throttle time as a gauge is that it's > > > > > transient. If the metric is not read immediately, the recorded > value > > > > could > > > > > be reset to 0. The admin won't realize that throttling has > happened. > > > > > > > > > > For client quotas, the throttle time is tracked as the average > > > > > throttle-time per user/client-id. This makes the metric less > > transient. > > > > > > > > > > Also, the configs use read/write whereas the metrics use > fetch/copy. > > > > Could > > > > > we make them consistent? > > > > > > > > > > Jun > > > > > > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar < > > > > abhijeet.cse@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > Clarified the meaning of the two metrics. Also updated the KIP. > > > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > name=RemoteFetchThrottleTime > > > -> > > > > > The > > > > > > duration of time required at a given moment to bring the observed > > > fetch > > > > > > rate within the allowed limit, by preventing further reads. > > > > > > kafka.log.remote:type=RemoteLogManager, > name=RemoteCopyThrottleTime > > > -> > > > > > The > > > > > > duration of time required at a given moment to bring the observed > > > > remote > > > > > > copy rate within the allowed limit, by preventing further copies. > > > > > > > > > > > > Regards. > > > > > > > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > > > > > > > > wrote: > > > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > > > > > > > > > Just a minor comment. Could you document the exact meaning of > the > > > > > > following > > > > > > > two metrics? For example, is the time accumulated? If so, is it > > > from > > > > > the > > > > > > > start of the broker or over some window? > > > > > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > > name=RemoteFetchThrottleTime > > > > > > > kafka.log.remote:type=RemoteLogManager, > > name=RemoteCopyThrottleTime > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar < > > > > > > abhijeet.cse@gmail.com > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > The existing quota system for consumers is designed to > throttle > > > the > > > > > > > > consumer if it exceeds the allowed fetch rate. > > > > > > > > The additional quota we want to add works on the broker > level. > > If > > > > the > > > > > > > > broker-level remote read quota is being > > > > > > > > exceeded, we prevent additional reads from the remote storage > > but > > > > do > > > > > > not > > > > > > > > prevent local reads for the consumer. > > > > > > > > If the consumer has specified other partitions to read, which > > can > > > > be > > > > > > > served
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Thank you all for your comments. As all the comments in the thread are addressed, I am starting a Vote thread for the KIP. Please have a look. Regards. On Thu, Mar 7, 2024 at 12:34 PM Luke Chen wrote: > Hi Abhijeet, > > Thanks for the update and the explanation. > I had another look, and it LGTM now! > > Thanks. > Luke > > On Tue, Mar 5, 2024 at 2:50 AM Jun Rao wrote: > > > Hi, Abhijeet, > > > > Thanks for the reply. Sounds good to me. > > > > Jun > > > > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar < > abhijeet.cse@gmail.com> > > wrote: > > > > > Hi Jun, > > > > > > Thanks for pointing it out. It makes sense to me. We can have the > > following > > > metrics instead. What do you think? > > > > > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms > remote > > >fetches/copies was throttled by a broker) > > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms > > remote > > >fetches/copies was throttled by a broker) > > > > > > These are similar to fetch-throttle-time-avg and > fetch-throttle-time-max > > > metrics we have for Kafak Consumers? > > > The Avg and Max are computed over the (sliding) window as defined by > the > > > configuration metrics.sample.window.ms on the server. > > > > > > (Also, I will update the config and metric names to be consistent) > > > > > > Regards. > > > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao > > wrote: > > > > > > > Hi, Abhijeet, > > > > > > > > Thanks for the reply. > > > > > > > > The issue with recording the throttle time as a gauge is that it's > > > > transient. If the metric is not read immediately, the recorded value > > > could > > > > be reset to 0. The admin won't realize that throttling has happened. > > > > > > > > For client quotas, the throttle time is tracked as the average > > > > throttle-time per user/client-id. This makes the metric less > transient. > > > > > > > > Also, the configs use read/write whereas the metrics use fetch/copy. > > > Could > > > > we make them consistent? > > > > > > > > Jun > > > > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar < > > > abhijeet.cse@gmail.com > > > > > > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Clarified the meaning of the two metrics. Also updated the KIP. > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > name=RemoteFetchThrottleTime > > -> > > > > The > > > > > duration of time required at a given moment to bring the observed > > fetch > > > > > rate within the allowed limit, by preventing further reads. > > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > > -> > > > > The > > > > > duration of time required at a given moment to bring the observed > > > remote > > > > > copy rate within the allowed limit, by preventing further copies. > > > > > > > > > > Regards. > > > > > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > > > > > wrote: > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > > > > > > > Just a minor comment. Could you document the exact meaning of the > > > > > following > > > > > > two metrics? For example, is the time accumulated? If so, is it > > from > > > > the > > > > > > start of the broker or over some window? > > > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > > name=RemoteFetchThrottleTime > > > > > > kafka.log.remote:type=RemoteLogManager, > name=RemoteCopyThrottleTime > > > > > > > > > > > > Jun > > > > > > > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar < > > > > > abhijeet.cse@gmail.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > The existing quota system for consumers is designed to throttle > > the > > > > > > > consumer if it exceeds the allowed fetch rate. > > > > > > > The additional quota we want to add works on the broker level. > If > > > the > > > > > > > broker-level remote read quota is being > > > > > > > exceeded, we prevent additional reads from the remote storage > but > > > do > > > > > not > > > > > > > prevent local reads for the consumer. > > > > > > > If the consumer has specified other partitions to read, which > can > > > be > > > > > > served > > > > > > > from local, it can continue to read those > > > > > > > partitions. To elaborate more, we make a check for quota > exceeded > > > if > > > > we > > > > > > > know a segment needs to be read from > > > > > > > remote. If the quota is exceeded, we simply skip the partition > > and > > > > move > > > > > > to > > > > > > > other segments in the fetch request. > > > > > > > This way consumers can continue to read the local data as long > as > > > > they > > > > > > have > > > > > > > not exceeded the client-level quota. > > > > > > > > > > > > > > Also, when we choose the appropriate consumer-level quota, we > > would > > > > > > > typically look at what kind of local fetch > > > > > > > throughput is supported. If we were to reuse
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Abhijeet, Thanks for the update and the explanation. I had another look, and it LGTM now! Thanks. Luke On Tue, Mar 5, 2024 at 2:50 AM Jun Rao wrote: > Hi, Abhijeet, > > Thanks for the reply. Sounds good to me. > > Jun > > > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar > wrote: > > > Hi Jun, > > > > Thanks for pointing it out. It makes sense to me. We can have the > following > > metrics instead. What do you think? > > > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms remote > >fetches/copies was throttled by a broker) > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms > remote > >fetches/copies was throttled by a broker) > > > > These are similar to fetch-throttle-time-avg and fetch-throttle-time-max > > metrics we have for Kafak Consumers? > > The Avg and Max are computed over the (sliding) window as defined by the > > configuration metrics.sample.window.ms on the server. > > > > (Also, I will update the config and metric names to be consistent) > > > > Regards. > > > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao > wrote: > > > > > Hi, Abhijeet, > > > > > > Thanks for the reply. > > > > > > The issue with recording the throttle time as a gauge is that it's > > > transient. If the metric is not read immediately, the recorded value > > could > > > be reset to 0. The admin won't realize that throttling has happened. > > > > > > For client quotas, the throttle time is tracked as the average > > > throttle-time per user/client-id. This makes the metric less transient. > > > > > > Also, the configs use read/write whereas the metrics use fetch/copy. > > Could > > > we make them consistent? > > > > > > Jun > > > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar < > > abhijeet.cse@gmail.com > > > > > > > wrote: > > > > > > > Hi Jun, > > > > > > > > Clarified the meaning of the two metrics. Also updated the KIP. > > > > > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime > -> > > > The > > > > duration of time required at a given moment to bring the observed > fetch > > > > rate within the allowed limit, by preventing further reads. > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > -> > > > The > > > > duration of time required at a given moment to bring the observed > > remote > > > > copy rate within the allowed limit, by preventing further copies. > > > > > > > > Regards. > > > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > > > wrote: > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > > > > > Just a minor comment. Could you document the exact meaning of the > > > > following > > > > > two metrics? For example, is the time accumulated? If so, is it > from > > > the > > > > > start of the broker or over some window? > > > > > > > > > > kafka.log.remote:type=RemoteLogManager, > name=RemoteFetchThrottleTime > > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > > > > > > > > > > Jun > > > > > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar < > > > > abhijeet.cse@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > The existing quota system for consumers is designed to throttle > the > > > > > > consumer if it exceeds the allowed fetch rate. > > > > > > The additional quota we want to add works on the broker level. If > > the > > > > > > broker-level remote read quota is being > > > > > > exceeded, we prevent additional reads from the remote storage but > > do > > > > not > > > > > > prevent local reads for the consumer. > > > > > > If the consumer has specified other partitions to read, which can > > be > > > > > served > > > > > > from local, it can continue to read those > > > > > > partitions. To elaborate more, we make a check for quota exceeded > > if > > > we > > > > > > know a segment needs to be read from > > > > > > remote. If the quota is exceeded, we simply skip the partition > and > > > move > > > > > to > > > > > > other segments in the fetch request. > > > > > > This way consumers can continue to read the local data as long as > > > they > > > > > have > > > > > > not exceeded the client-level quota. > > > > > > > > > > > > Also, when we choose the appropriate consumer-level quota, we > would > > > > > > typically look at what kind of local fetch > > > > > > throughput is supported. If we were to reuse the same consumer > > quota, > > > > we > > > > > > should also consider the throughput > > > > > > the remote storage supports. The throughput supported by remote > may > > > be > > > > > > less/more than the throughput supported > > > > > > by local (when using a cloud provider, it depends on the plan > opted > > > by > > > > > the > > > > > > user). The consumer quota has to be carefully > > > > > > set considering both local and remote throughput. Instead, if we > > > have a > > > > > > separate quota, it makes things much simpler > > > >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi, Abhijeet, Thanks for the reply. Sounds good to me. Jun On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar wrote: > Hi Jun, > > Thanks for pointing it out. It makes sense to me. We can have the following > metrics instead. What do you think? > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms remote >fetches/copies was throttled by a broker) >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms remote >fetches/copies was throttled by a broker) > > These are similar to fetch-throttle-time-avg and fetch-throttle-time-max > metrics we have for Kafak Consumers? > The Avg and Max are computed over the (sliding) window as defined by the > configuration metrics.sample.window.ms on the server. > > (Also, I will update the config and metric names to be consistent) > > Regards. > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao wrote: > > > Hi, Abhijeet, > > > > Thanks for the reply. > > > > The issue with recording the throttle time as a gauge is that it's > > transient. If the metric is not read immediately, the recorded value > could > > be reset to 0. The admin won't realize that throttling has happened. > > > > For client quotas, the throttle time is tracked as the average > > throttle-time per user/client-id. This makes the metric less transient. > > > > Also, the configs use read/write whereas the metrics use fetch/copy. > Could > > we make them consistent? > > > > Jun > > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar < > abhijeet.cse@gmail.com > > > > > wrote: > > > > > Hi Jun, > > > > > > Clarified the meaning of the two metrics. Also updated the KIP. > > > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> > > The > > > duration of time required at a given moment to bring the observed fetch > > > rate within the allowed limit, by preventing further reads. > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> > > The > > > duration of time required at a given moment to bring the observed > remote > > > copy rate within the allowed limit, by preventing further copies. > > > > > > Regards. > > > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > > wrote: > > > > > > > Hi, Abhijeet, > > > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > > > Just a minor comment. Could you document the exact meaning of the > > > following > > > > two metrics? For example, is the time accumulated? If so, is it from > > the > > > > start of the broker or over some window? > > > > > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > > > > > > > > Jun > > > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar < > > > abhijeet.cse@gmail.com > > > > > > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > The existing quota system for consumers is designed to throttle the > > > > > consumer if it exceeds the allowed fetch rate. > > > > > The additional quota we want to add works on the broker level. If > the > > > > > broker-level remote read quota is being > > > > > exceeded, we prevent additional reads from the remote storage but > do > > > not > > > > > prevent local reads for the consumer. > > > > > If the consumer has specified other partitions to read, which can > be > > > > served > > > > > from local, it can continue to read those > > > > > partitions. To elaborate more, we make a check for quota exceeded > if > > we > > > > > know a segment needs to be read from > > > > > remote. If the quota is exceeded, we simply skip the partition and > > move > > > > to > > > > > other segments in the fetch request. > > > > > This way consumers can continue to read the local data as long as > > they > > > > have > > > > > not exceeded the client-level quota. > > > > > > > > > > Also, when we choose the appropriate consumer-level quota, we would > > > > > typically look at what kind of local fetch > > > > > throughput is supported. If we were to reuse the same consumer > quota, > > > we > > > > > should also consider the throughput > > > > > the remote storage supports. The throughput supported by remote may > > be > > > > > less/more than the throughput supported > > > > > by local (when using a cloud provider, it depends on the plan opted > > by > > > > the > > > > > user). The consumer quota has to be carefully > > > > > set considering both local and remote throughput. Instead, if we > > have a > > > > > separate quota, it makes things much simpler > > > > > for the user, since they already know what throughput their remote > > > > storage > > > > > supports. > > > > > > > > > > (Also, thanks for pointing out. I will update the KIP based on the > > > > > discussion) > > > > > > > > > > Regards, > > > > > Abhijeet. > > > > > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao > > > > wrote: > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > Sorry for the late reply. It seems that you haven't
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Jun, Thanks for pointing it out. It makes sense to me. We can have the following metrics instead. What do you think? - remote-(fetch|copy)-throttle-time-avg (The average time in ms remote fetches/copies was throttled by a broker) - remote-(fetch|copy)-throttle-time--max (The maximum time in ms remote fetches/copies was throttled by a broker) These are similar to fetch-throttle-time-avg and fetch-throttle-time-max metrics we have for Kafak Consumers? The Avg and Max are computed over the (sliding) window as defined by the configuration metrics.sample.window.ms on the server. (Also, I will update the config and metric names to be consistent) Regards. On Thu, Feb 29, 2024 at 2:51 AM Jun Rao wrote: > Hi, Abhijeet, > > Thanks for the reply. > > The issue with recording the throttle time as a gauge is that it's > transient. If the metric is not read immediately, the recorded value could > be reset to 0. The admin won't realize that throttling has happened. > > For client quotas, the throttle time is tracked as the average > throttle-time per user/client-id. This makes the metric less transient. > > Also, the configs use read/write whereas the metrics use fetch/copy. Could > we make them consistent? > > Jun > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar > > wrote: > > > Hi Jun, > > > > Clarified the meaning of the two metrics. Also updated the KIP. > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> > The > > duration of time required at a given moment to bring the observed fetch > > rate within the allowed limit, by preventing further reads. > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> > The > > duration of time required at a given moment to bring the observed remote > > copy rate within the allowed limit, by preventing further copies. > > > > Regards. > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao > wrote: > > > > > Hi, Abhijeet, > > > > > > Thanks for the explanation. Makes sense to me now. > > > > > > Just a minor comment. Could you document the exact meaning of the > > following > > > two metrics? For example, is the time accumulated? If so, is it from > the > > > start of the broker or over some window? > > > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > > > > > > Jun > > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar < > > abhijeet.cse@gmail.com > > > > > > > wrote: > > > > > > > Hi Jun, > > > > > > > > The existing quota system for consumers is designed to throttle the > > > > consumer if it exceeds the allowed fetch rate. > > > > The additional quota we want to add works on the broker level. If the > > > > broker-level remote read quota is being > > > > exceeded, we prevent additional reads from the remote storage but do > > not > > > > prevent local reads for the consumer. > > > > If the consumer has specified other partitions to read, which can be > > > served > > > > from local, it can continue to read those > > > > partitions. To elaborate more, we make a check for quota exceeded if > we > > > > know a segment needs to be read from > > > > remote. If the quota is exceeded, we simply skip the partition and > move > > > to > > > > other segments in the fetch request. > > > > This way consumers can continue to read the local data as long as > they > > > have > > > > not exceeded the client-level quota. > > > > > > > > Also, when we choose the appropriate consumer-level quota, we would > > > > typically look at what kind of local fetch > > > > throughput is supported. If we were to reuse the same consumer quota, > > we > > > > should also consider the throughput > > > > the remote storage supports. The throughput supported by remote may > be > > > > less/more than the throughput supported > > > > by local (when using a cloud provider, it depends on the plan opted > by > > > the > > > > user). The consumer quota has to be carefully > > > > set considering both local and remote throughput. Instead, if we > have a > > > > separate quota, it makes things much simpler > > > > for the user, since they already know what throughput their remote > > > storage > > > > supports. > > > > > > > > (Also, thanks for pointing out. I will update the KIP based on the > > > > discussion) > > > > > > > > Regards, > > > > Abhijeet. > > > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao > > > wrote: > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > Sorry for the late reply. It seems that you haven't updated the KIP > > > based > > > > > on the discussion? One more comment. > > > > > > > > > > 11. Currently, we already have a quota system for both the > producers > > > and > > > > > consumers. I can understand why we need an additional > > > > > remote.log.manager.write.quota.default quota. For example, when > tier > > > > > storage is enabled for the first time, there could be a lot of > > segments > > > > > that need
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi, Abhijeet, Thanks for the reply. The issue with recording the throttle time as a gauge is that it's transient. If the metric is not read immediately, the recorded value could be reset to 0. The admin won't realize that throttling has happened. For client quotas, the throttle time is tracked as the average throttle-time per user/client-id. This makes the metric less transient. Also, the configs use read/write whereas the metrics use fetch/copy. Could we make them consistent? Jun On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar wrote: > Hi Jun, > > Clarified the meaning of the two metrics. Also updated the KIP. > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> The > duration of time required at a given moment to bring the observed fetch > rate within the allowed limit, by preventing further reads. > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> The > duration of time required at a given moment to bring the observed remote > copy rate within the allowed limit, by preventing further copies. > > Regards. > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao wrote: > > > Hi, Abhijeet, > > > > Thanks for the explanation. Makes sense to me now. > > > > Just a minor comment. Could you document the exact meaning of the > following > > two metrics? For example, is the time accumulated? If so, is it from the > > start of the broker or over some window? > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > > > > Jun > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar < > abhijeet.cse@gmail.com > > > > > wrote: > > > > > Hi Jun, > > > > > > The existing quota system for consumers is designed to throttle the > > > consumer if it exceeds the allowed fetch rate. > > > The additional quota we want to add works on the broker level. If the > > > broker-level remote read quota is being > > > exceeded, we prevent additional reads from the remote storage but do > not > > > prevent local reads for the consumer. > > > If the consumer has specified other partitions to read, which can be > > served > > > from local, it can continue to read those > > > partitions. To elaborate more, we make a check for quota exceeded if we > > > know a segment needs to be read from > > > remote. If the quota is exceeded, we simply skip the partition and move > > to > > > other segments in the fetch request. > > > This way consumers can continue to read the local data as long as they > > have > > > not exceeded the client-level quota. > > > > > > Also, when we choose the appropriate consumer-level quota, we would > > > typically look at what kind of local fetch > > > throughput is supported. If we were to reuse the same consumer quota, > we > > > should also consider the throughput > > > the remote storage supports. The throughput supported by remote may be > > > less/more than the throughput supported > > > by local (when using a cloud provider, it depends on the plan opted by > > the > > > user). The consumer quota has to be carefully > > > set considering both local and remote throughput. Instead, if we have a > > > separate quota, it makes things much simpler > > > for the user, since they already know what throughput their remote > > storage > > > supports. > > > > > > (Also, thanks for pointing out. I will update the KIP based on the > > > discussion) > > > > > > Regards, > > > Abhijeet. > > > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao > > wrote: > > > > > > > Hi, Abhijeet, > > > > > > > > Sorry for the late reply. It seems that you haven't updated the KIP > > based > > > > on the discussion? One more comment. > > > > > > > > 11. Currently, we already have a quota system for both the producers > > and > > > > consumers. I can understand why we need an additional > > > > remote.log.manager.write.quota.default quota. For example, when tier > > > > storage is enabled for the first time, there could be a lot of > segments > > > > that need to be written to the remote storage, even though there is > no > > > > increase in the produced data. However, I am not sure about an > > > > additional remote.log.manager.read.quota.default. The KIP says that > the > > > > reason is "This may happen when the majority of the consumers start > > > reading > > > > from the earliest offset of their respective Kafka topics.". However, > > > this > > > > can happen with or without tier storage and the current quota system > > for > > > > consumers is designed for solving this exact problem. Could you > explain > > > the > > > > usage of this additional quota? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar < > > > > abhijeet.cse@gmail.com> > > > > wrote: > > > > > > > > > Comments inline > > > > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao > > > wrote: > > > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > > > Thanks for the KIP. A few
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Jun, Clarified the meaning of the two metrics. Also updated the KIP. kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime -> The duration of time required at a given moment to bring the observed fetch rate within the allowed limit, by preventing further reads. kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime -> The duration of time required at a given moment to bring the observed remote copy rate within the allowed limit, by preventing further copies. Regards. On Wed, Feb 28, 2024 at 12:28 AM Jun Rao wrote: > Hi, Abhijeet, > > Thanks for the explanation. Makes sense to me now. > > Just a minor comment. Could you document the exact meaning of the following > two metrics? For example, is the time accumulated? If so, is it from the > start of the broker or over some window? > > kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime > > Jun > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar > > wrote: > > > Hi Jun, > > > > The existing quota system for consumers is designed to throttle the > > consumer if it exceeds the allowed fetch rate. > > The additional quota we want to add works on the broker level. If the > > broker-level remote read quota is being > > exceeded, we prevent additional reads from the remote storage but do not > > prevent local reads for the consumer. > > If the consumer has specified other partitions to read, which can be > served > > from local, it can continue to read those > > partitions. To elaborate more, we make a check for quota exceeded if we > > know a segment needs to be read from > > remote. If the quota is exceeded, we simply skip the partition and move > to > > other segments in the fetch request. > > This way consumers can continue to read the local data as long as they > have > > not exceeded the client-level quota. > > > > Also, when we choose the appropriate consumer-level quota, we would > > typically look at what kind of local fetch > > throughput is supported. If we were to reuse the same consumer quota, we > > should also consider the throughput > > the remote storage supports. The throughput supported by remote may be > > less/more than the throughput supported > > by local (when using a cloud provider, it depends on the plan opted by > the > > user). The consumer quota has to be carefully > > set considering both local and remote throughput. Instead, if we have a > > separate quota, it makes things much simpler > > for the user, since they already know what throughput their remote > storage > > supports. > > > > (Also, thanks for pointing out. I will update the KIP based on the > > discussion) > > > > Regards, > > Abhijeet. > > > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao > wrote: > > > > > Hi, Abhijeet, > > > > > > Sorry for the late reply. It seems that you haven't updated the KIP > based > > > on the discussion? One more comment. > > > > > > 11. Currently, we already have a quota system for both the producers > and > > > consumers. I can understand why we need an additional > > > remote.log.manager.write.quota.default quota. For example, when tier > > > storage is enabled for the first time, there could be a lot of segments > > > that need to be written to the remote storage, even though there is no > > > increase in the produced data. However, I am not sure about an > > > additional remote.log.manager.read.quota.default. The KIP says that the > > > reason is "This may happen when the majority of the consumers start > > reading > > > from the earliest offset of their respective Kafka topics.". However, > > this > > > can happen with or without tier storage and the current quota system > for > > > consumers is designed for solving this exact problem. Could you explain > > the > > > usage of this additional quota? > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar < > > > abhijeet.cse@gmail.com> > > > wrote: > > > > > > > Comments inline > > > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao > > wrote: > > > > > > > > > Hi, Abhijeet, > > > > > > > > > > Thanks for the KIP. A few comments. > > > > > > > > > > 10. remote.log.manager.write.quota.default: > > > > > 10.1 For other configs, we > > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be > consistent, > > > > > perhaps this can be sth like > > > > remote.log.manager.write.max.bytes.per.second. > > > > > > > > > > > > > This makes sense, we can rename the following configs to be > consistent. > > > > > > > > Remote.log.manager.write.quota.default -> > > > > remote.log.manager.write.max.bytes.per.second > > > > > > > > Remote.log.manager.read.quota.default -> > > > > remote.log.manager.read.max.bytes.per.second. > > > > > > > > > > > > > > > > > 10.2 Could we list the new metrics associated with the new quota. > > > > > > > > > > > > > We will add the following metrics as mentioned in the other response. > > > >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi, Abhijeet, Thanks for the explanation. Makes sense to me now. Just a minor comment. Could you document the exact meaning of the following two metrics? For example, is the time accumulated? If so, is it from the start of the broker or over some window? kafka.log.remote:type=RemoteLogManager, name=RemoteFetchThrottleTime kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime Jun On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar wrote: > Hi Jun, > > The existing quota system for consumers is designed to throttle the > consumer if it exceeds the allowed fetch rate. > The additional quota we want to add works on the broker level. If the > broker-level remote read quota is being > exceeded, we prevent additional reads from the remote storage but do not > prevent local reads for the consumer. > If the consumer has specified other partitions to read, which can be served > from local, it can continue to read those > partitions. To elaborate more, we make a check for quota exceeded if we > know a segment needs to be read from > remote. If the quota is exceeded, we simply skip the partition and move to > other segments in the fetch request. > This way consumers can continue to read the local data as long as they have > not exceeded the client-level quota. > > Also, when we choose the appropriate consumer-level quota, we would > typically look at what kind of local fetch > throughput is supported. If we were to reuse the same consumer quota, we > should also consider the throughput > the remote storage supports. The throughput supported by remote may be > less/more than the throughput supported > by local (when using a cloud provider, it depends on the plan opted by the > user). The consumer quota has to be carefully > set considering both local and remote throughput. Instead, if we have a > separate quota, it makes things much simpler > for the user, since they already know what throughput their remote storage > supports. > > (Also, thanks for pointing out. I will update the KIP based on the > discussion) > > Regards, > Abhijeet. > > On Tue, Feb 27, 2024 at 2:49 AM Jun Rao wrote: > > > Hi, Abhijeet, > > > > Sorry for the late reply. It seems that you haven't updated the KIP based > > on the discussion? One more comment. > > > > 11. Currently, we already have a quota system for both the producers and > > consumers. I can understand why we need an additional > > remote.log.manager.write.quota.default quota. For example, when tier > > storage is enabled for the first time, there could be a lot of segments > > that need to be written to the remote storage, even though there is no > > increase in the produced data. However, I am not sure about an > > additional remote.log.manager.read.quota.default. The KIP says that the > > reason is "This may happen when the majority of the consumers start > reading > > from the earliest offset of their respective Kafka topics.". However, > this > > can happen with or without tier storage and the current quota system for > > consumers is designed for solving this exact problem. Could you explain > the > > usage of this additional quota? > > > > Thanks, > > > > Jun > > > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar < > > abhijeet.cse@gmail.com> > > wrote: > > > > > Comments inline > > > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao > wrote: > > > > > > > Hi, Abhijeet, > > > > > > > > Thanks for the KIP. A few comments. > > > > > > > > 10. remote.log.manager.write.quota.default: > > > > 10.1 For other configs, we > > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, > > > > perhaps this can be sth like > > > remote.log.manager.write.max.bytes.per.second. > > > > > > > > > > This makes sense, we can rename the following configs to be consistent. > > > > > > Remote.log.manager.write.quota.default -> > > > remote.log.manager.write.max.bytes.per.second > > > > > > Remote.log.manager.read.quota.default -> > > > remote.log.manager.read.max.bytes.per.second. > > > > > > > > > > > > > 10.2 Could we list the new metrics associated with the new quota. > > > > > > > > > > We will add the following metrics as mentioned in the other response. > > > *RemoteFetchThrottleTime* - The amount of time needed to bring the > > observed > > > remote fetch rate within the read quota > > > *RemoteCopyThrottleTime *- The amount of time needed to bring the > > observed > > > remote copy rate with the copy quota. > > > > > > 10.3 Is this dynamically configurable? If so, could we document the > > impact > > > > to tools like kafka-configs.sh and AdminClient? > > > > > > > > > > Yes, the quotas are dynamically configurable. We will add them as > Dynamic > > > Broker Configs. Users will be able to change > > > the following configs using either kafka-configs.sh or AdminClient by > > > specifying the config name and new value. For eg. > > > > > > Using kafka-configs.sh > > > > > > bin/kafka-configs.sh --bootstrap-server > --entity-type > > > brokers
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Jun, The existing quota system for consumers is designed to throttle the consumer if it exceeds the allowed fetch rate. The additional quota we want to add works on the broker level. If the broker-level remote read quota is being exceeded, we prevent additional reads from the remote storage but do not prevent local reads for the consumer. If the consumer has specified other partitions to read, which can be served from local, it can continue to read those partitions. To elaborate more, we make a check for quota exceeded if we know a segment needs to be read from remote. If the quota is exceeded, we simply skip the partition and move to other segments in the fetch request. This way consumers can continue to read the local data as long as they have not exceeded the client-level quota. Also, when we choose the appropriate consumer-level quota, we would typically look at what kind of local fetch throughput is supported. If we were to reuse the same consumer quota, we should also consider the throughput the remote storage supports. The throughput supported by remote may be less/more than the throughput supported by local (when using a cloud provider, it depends on the plan opted by the user). The consumer quota has to be carefully set considering both local and remote throughput. Instead, if we have a separate quota, it makes things much simpler for the user, since they already know what throughput their remote storage supports. (Also, thanks for pointing out. I will update the KIP based on the discussion) Regards, Abhijeet. On Tue, Feb 27, 2024 at 2:49 AM Jun Rao wrote: > Hi, Abhijeet, > > Sorry for the late reply. It seems that you haven't updated the KIP based > on the discussion? One more comment. > > 11. Currently, we already have a quota system for both the producers and > consumers. I can understand why we need an additional > remote.log.manager.write.quota.default quota. For example, when tier > storage is enabled for the first time, there could be a lot of segments > that need to be written to the remote storage, even though there is no > increase in the produced data. However, I am not sure about an > additional remote.log.manager.read.quota.default. The KIP says that the > reason is "This may happen when the majority of the consumers start reading > from the earliest offset of their respective Kafka topics.". However, this > can happen with or without tier storage and the current quota system for > consumers is designed for solving this exact problem. Could you explain the > usage of this additional quota? > > Thanks, > > Jun > > On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar < > abhijeet.cse@gmail.com> > wrote: > > > Comments inline > > > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao wrote: > > > > > Hi, Abhijeet, > > > > > > Thanks for the KIP. A few comments. > > > > > > 10. remote.log.manager.write.quota.default: > > > 10.1 For other configs, we > > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, > > > perhaps this can be sth like > > remote.log.manager.write.max.bytes.per.second. > > > > > > > This makes sense, we can rename the following configs to be consistent. > > > > Remote.log.manager.write.quota.default -> > > remote.log.manager.write.max.bytes.per.second > > > > Remote.log.manager.read.quota.default -> > > remote.log.manager.read.max.bytes.per.second. > > > > > > > > > 10.2 Could we list the new metrics associated with the new quota. > > > > > > > We will add the following metrics as mentioned in the other response. > > *RemoteFetchThrottleTime* - The amount of time needed to bring the > observed > > remote fetch rate within the read quota > > *RemoteCopyThrottleTime *- The amount of time needed to bring the > observed > > remote copy rate with the copy quota. > > > > 10.3 Is this dynamically configurable? If so, could we document the > impact > > > to tools like kafka-configs.sh and AdminClient? > > > > > > > Yes, the quotas are dynamically configurable. We will add them as Dynamic > > Broker Configs. Users will be able to change > > the following configs using either kafka-configs.sh or AdminClient by > > specifying the config name and new value. For eg. > > > > Using kafka-configs.sh > > > > bin/kafka-configs.sh --bootstrap-server --entity-type > > brokers --entity-default --alter --add-config > > remote.log.manager.write.max.bytes.per.second=52428800 > > > > Using AdminClient > > > > ConfigEntry configEntry = new > > ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800"); > > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, > > AlterConfigOp.OpType.SET); > > List alterConfigOps = > > Collections.singletonList(alterConfigOp); > > > > ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, > > ""); > > Map> updateConfig = > > ImmutableMap.of(resource, alterConfigOps); > > adminClient.incrementalAlterConfigs(updateConfig); > > > > > > > > > > Jun > > > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi, Abhijeet, Sorry for the late reply. It seems that you haven't updated the KIP based on the discussion? One more comment. 11. Currently, we already have a quota system for both the producers and consumers. I can understand why we need an additional remote.log.manager.write.quota.default quota. For example, when tier storage is enabled for the first time, there could be a lot of segments that need to be written to the remote storage, even though there is no increase in the produced data. However, I am not sure about an additional remote.log.manager.read.quota.default. The KIP says that the reason is "This may happen when the majority of the consumers start reading from the earliest offset of their respective Kafka topics.". However, this can happen with or without tier storage and the current quota system for consumers is designed for solving this exact problem. Could you explain the usage of this additional quota? Thanks, Jun On Mon, Feb 12, 2024 at 11:08 AM Abhijeet Kumar wrote: > Comments inline > > On Wed, Dec 6, 2023 at 1:12 AM Jun Rao wrote: > > > Hi, Abhijeet, > > > > Thanks for the KIP. A few comments. > > > > 10. remote.log.manager.write.quota.default: > > 10.1 For other configs, we > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, > > perhaps this can be sth like > remote.log.manager.write.max.bytes.per.second. > > > > This makes sense, we can rename the following configs to be consistent. > > Remote.log.manager.write.quota.default -> > remote.log.manager.write.max.bytes.per.second > > Remote.log.manager.read.quota.default -> > remote.log.manager.read.max.bytes.per.second. > > > > > 10.2 Could we list the new metrics associated with the new quota. > > > > We will add the following metrics as mentioned in the other response. > *RemoteFetchThrottleTime* - The amount of time needed to bring the observed > remote fetch rate within the read quota > *RemoteCopyThrottleTime *- The amount of time needed to bring the observed > remote copy rate with the copy quota. > > 10.3 Is this dynamically configurable? If so, could we document the impact > > to tools like kafka-configs.sh and AdminClient? > > > > Yes, the quotas are dynamically configurable. We will add them as Dynamic > Broker Configs. Users will be able to change > the following configs using either kafka-configs.sh or AdminClient by > specifying the config name and new value. For eg. > > Using kafka-configs.sh > > bin/kafka-configs.sh --bootstrap-server --entity-type > brokers --entity-default --alter --add-config > remote.log.manager.write.max.bytes.per.second=52428800 > > Using AdminClient > > ConfigEntry configEntry = new > ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800"); > AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, > AlterConfigOp.OpType.SET); > List alterConfigOps = > Collections.singletonList(alterConfigOp); > > ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, > ""); > Map> updateConfig = > ImmutableMap.of(resource, alterConfigOps); > adminClient.incrementalAlterConfigs(updateConfig); > > > > > > Jun > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen wrote: > > > > > Hi Abhijeet, > > > > > > Thanks for the KIP! > > > This is an important feature for tiered storage. > > > > > > Some comments: > > > 1. Will we introduce new metrics for this tiered storage quotas? > > > This is important because the admin can know the throttling status by > > > checking the metrics while the remote write/read are slow, like the > rate > > of > > > uploading/reading byte rate, the throttled time for upload/read... etc. > > > > > > 2. Could you give some examples for the throttling algorithm in the KIP > > to > > > explain it? That will make it much clearer. > > > > > > 3. To solve this problem, we can break down the RLMTask into two > smaller > > > tasks - one for segment upload and the other for handling expired > > segments. > > > How do we handle the situation when a segment is still waiting for > > > offloading while this segment is expired and eligible to be deleted? > > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and > > > just check it each time the RLMTask runs? > > > > > > Thank you. > > > Luke > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar < > > abhijeet.cse@gmail.com > > > > > > > wrote: > > > > > > > Hi All, > > > > > > > > I have created KIP-956 for defining read and write quota for tiered > > > > storage. > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > > > > > > > Feedback and suggestions are welcome. > > > > > > > > Regards, > > > > Abhijeet. > > > > > > > > > > > > -- > Abhijeet. >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Comments inline On Wed, Dec 6, 2023 at 1:12 AM Jun Rao wrote: > Hi, Abhijeet, > > Thanks for the KIP. A few comments. > > 10. remote.log.manager.write.quota.default: > 10.1 For other configs, we > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, > perhaps this can be sth like remote.log.manager.write.max.bytes.per.second. > This makes sense, we can rename the following configs to be consistent. Remote.log.manager.write.quota.default -> remote.log.manager.write.max.bytes.per.second Remote.log.manager.read.quota.default -> remote.log.manager.read.max.bytes.per.second. > 10.2 Could we list the new metrics associated with the new quota. > We will add the following metrics as mentioned in the other response. *RemoteFetchThrottleTime* - The amount of time needed to bring the observed remote fetch rate within the read quota *RemoteCopyThrottleTime *- The amount of time needed to bring the observed remote copy rate with the copy quota. 10.3 Is this dynamically configurable? If so, could we document the impact > to tools like kafka-configs.sh and AdminClient? > Yes, the quotas are dynamically configurable. We will add them as Dynamic Broker Configs. Users will be able to change the following configs using either kafka-configs.sh or AdminClient by specifying the config name and new value. For eg. Using kafka-configs.sh bin/kafka-configs.sh --bootstrap-server --entity-type brokers --entity-default --alter --add-config remote.log.manager.write.max.bytes.per.second=52428800 Using AdminClient ConfigEntry configEntry = new ConfigEntry("remote.log.manager.write.max.bytes.per.second", "5242800"); AlterConfigOp alterConfigOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET); List alterConfigOps = Collections.singletonList(alterConfigOp); ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, ""); Map> updateConfig = ImmutableMap.of(resource, alterConfigOps); adminClient.incrementalAlterConfigs(updateConfig); > > Jun > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen wrote: > > > Hi Abhijeet, > > > > Thanks for the KIP! > > This is an important feature for tiered storage. > > > > Some comments: > > 1. Will we introduce new metrics for this tiered storage quotas? > > This is important because the admin can know the throttling status by > > checking the metrics while the remote write/read are slow, like the rate > of > > uploading/reading byte rate, the throttled time for upload/read... etc. > > > > 2. Could you give some examples for the throttling algorithm in the KIP > to > > explain it? That will make it much clearer. > > > > 3. To solve this problem, we can break down the RLMTask into two smaller > > tasks - one for segment upload and the other for handling expired > segments. > > How do we handle the situation when a segment is still waiting for > > offloading while this segment is expired and eligible to be deleted? > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and > > just check it each time the RLMTask runs? > > > > Thank you. > > Luke > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar < > abhijeet.cse@gmail.com > > > > > wrote: > > > > > Hi All, > > > > > > I have created KIP-956 for defining read and write quota for tiered > > > storage. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > > > > > Feedback and suggestions are welcome. > > > > > > Regards, > > > Abhijeet. > > > > > > -- Abhijeet.
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Comments inline On Tue, Nov 28, 2023 at 3:50 PM Luke Chen wrote: > > Hi Abhijeet, > > Thanks for the KIP! > This is an important feature for tiered storage. > > Some comments: > 1. Will we introduce new metrics for this tiered storage quotas? > This is important because the admin can know the throttling status by > checking the metrics while the remote write/read are slow, like the rate of > uploading/reading byte rate, the throttled time for upload/read... etc. > Good point. When the fetches/copies on a broker are getting throttled, we'll notice that the broker's fetch and copy activities stabilize according to the predefined limit. We plan to add the following extra metrics to indicate the throttled time for fetches/copies, similar to the local fetch throttle metrics. *RemoteFetchThrottleTime* - The amount of time needed to bring the observed remote fetch rate within the read quota *RemoteCopyThrottleTime *- The amount of time needed to bring the observed remote copy rate with the copy quota. > 2. Could you give some examples for the throttling algorithm in the KIP to > explain it? That will make it much clearer. > The QuotaManagers look something like: class RlmWriteQuotaManager { public void updateQuota(Quota newQuota) {// Implementation} public boolean isQuotaExceeded() {// Implementation} public void record(Double value) {// Implementation} } class RlmReadQuotaManager { public void updateQuota(Quota newQuota) {// Implementation} public boolean isQuotaExceeded() {// Implementation} public void record(Double value) {// Implementation} } *Read Throttling* RemoteLogReader will capture the read rate once it has fetched the remote log segment to serve the fetch request. Changes would look something like below. class RemoteLogReader { @Override public Void call() { // ... quotaManager.record(result.fetchDataInfo.map(info -> (double) info.records.sizeInBytes()).orElse(0.0)); callback.accept(result); } } Read throttling will be carried out by the ReplicaManager. When it gets offset out of range error, it will check for quota getting exhausted before returning delayedRemoteStorageFetch info. def handleOffsetOutOfRangeError() { // ... if (params.isFromFollower) { // ... } else { // ... val fetchDataInfo = if (isRemoteLogReadQuotaExceeded) { info("Read quota exceeded. Returning empty remote storage fetch info") FetchDataInfo( LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY, delayedRemoteStorageFetch = None ) } else { // Regular case when quota is not exceeded FetchDataInfo(LogOffsetMetadata(fetchInfo.fetchOffset), MemoryRecords.EMPTY, delayedRemoteStorageFetch = Some( RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, fetchInfo, fetchIsolation))) } //... } When the read quota is exceeded, we return an empty remote storage fetch info. We do not want to send an exception in the LogReadResult response (like we do in other cases when we send UnknownOffsetMetadata), because then it is classified as an error in reading data, and a response is immediately sent back to the client. Instead, we want that we should be able to serve data for other topic partitions in the fetch request via delayed fetch if required (when sending an immediate response, delayed fetch is skipped). Also, immediately sending a response would make the consumer retry again immediately, which may run into quota exceeded situation again and thus get it into a loop. *Write Throttling* RemoteLogManager.java private ReentrantLock writeQuotaManagerLock = new ReentrantLock(true); private Condition lockCondition = writeQuotaManagerLock.newCondition(); private void copyLogSegment { // ... writeQuotaManagerLock.lock(); try { while (rlmWriteQuotaManager.isQuotaExceeded) { // Quota exceeded, waiting for quota to be available lockCondition.await(timeout, TimeUnit.MILLISECONDS); } rlmWriteQuotamanager.record(segment.log.sizeInBytes()) // Signal waiting threads to check the quota locakCondition.signalAll() } finally { writeQuotaManagerLock.unlock(); } // Actual copy operation } Before beginning to copy the segment to remote, the RLMTask checks if the quota is exceeded. If it has, the task waits for a specified period before rechecking the quota status. Once the RLMTask confirms that the quota is available, it records the size of the log it plans to upload with the quota manager before proceeding with the segment upload. This step is crucial to ensure that the quota is accurately updated and prevents other RLMTasks from mistakenly assuming that quota space is available for them as well. > 3. To solve this problem, we can break down the RLMTask into two smaller > tasks - one for segment upload and the other for handling expired segments. > How do we handle the situation when a segment is still waiting for > offloading while this segment is
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Yes, it is part of V1. I have added it now. I will follow up on the KIP comments in the next couple of days and try to close the review in the next few weeks. Regards. On Thu, Feb 1, 2024 at 7:40 PM Francois Visconte wrote: > > Hi, > > I see that the ticket has been left untouched since a while now. > Should it be included in the tiered storage v1? > We've observed that lacking a way to throttle uploads to tiered storage has > a major impact on > producers and consumers when tiered storage access recovers (starving disk > IOps/throughput or CPU). > For this reason, I think this is an important feature and possibly worth > including in v1? > > Regards, > > > On Tue, Dec 5, 2023 at 8:43 PM Jun Rao wrote: > > > Hi, Abhijeet, > > > > Thanks for the KIP. A few comments. > > > > 10. remote.log.manager.write.quota.default: > > 10.1 For other configs, we > > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, > > perhaps this can be sth like remote.log.manager.write.max.bytes.per.second. > > 10.2 Could we list the new metrics associated with the new quota. > > 10.3 Is this dynamically configurable? If so, could we document the impact > > to tools like kafka-configs.sh and AdminClient? > > > > Jun > > > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen wrote: > > > > > Hi Abhijeet, > > > > > > Thanks for the KIP! > > > This is an important feature for tiered storage. > > > > > > Some comments: > > > 1. Will we introduce new metrics for this tiered storage quotas? > > > This is important because the admin can know the throttling status by > > > checking the metrics while the remote write/read are slow, like the rate > > of > > > uploading/reading byte rate, the throttled time for upload/read... etc. > > > > > > 2. Could you give some examples for the throttling algorithm in the KIP > > to > > > explain it? That will make it much clearer. > > > > > > 3. To solve this problem, we can break down the RLMTask into two smaller > > > tasks - one for segment upload and the other for handling expired > > segments. > > > How do we handle the situation when a segment is still waiting for > > > offloading while this segment is expired and eligible to be deleted? > > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and > > > just check it each time the RLMTask runs? > > > > > > Thank you. > > > Luke > > > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar < > > abhijeet.cse@gmail.com > > > > > > > wrote: > > > > > > > Hi All, > > > > > > > > I have created KIP-956 for defining read and write quota for tiered > > > > storage. > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > > > > > > > Feedback and suggestions are welcome. > > > > > > > > Regards, > > > > Abhijeet. > > > > > > > > > -- Abhijeet.
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi, I see that the ticket has been left untouched since a while now. Should it be included in the tiered storage v1? We've observed that lacking a way to throttle uploads to tiered storage has a major impact on producers and consumers when tiered storage access recovers (starving disk IOps/throughput or CPU). For this reason, I think this is an important feature and possibly worth including in v1? Regards, On Tue, Dec 5, 2023 at 8:43 PM Jun Rao wrote: > Hi, Abhijeet, > > Thanks for the KIP. A few comments. > > 10. remote.log.manager.write.quota.default: > 10.1 For other configs, we > use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, > perhaps this can be sth like remote.log.manager.write.max.bytes.per.second. > 10.2 Could we list the new metrics associated with the new quota. > 10.3 Is this dynamically configurable? If so, could we document the impact > to tools like kafka-configs.sh and AdminClient? > > Jun > > On Tue, Nov 28, 2023 at 2:19 AM Luke Chen wrote: > > > Hi Abhijeet, > > > > Thanks for the KIP! > > This is an important feature for tiered storage. > > > > Some comments: > > 1. Will we introduce new metrics for this tiered storage quotas? > > This is important because the admin can know the throttling status by > > checking the metrics while the remote write/read are slow, like the rate > of > > uploading/reading byte rate, the throttled time for upload/read... etc. > > > > 2. Could you give some examples for the throttling algorithm in the KIP > to > > explain it? That will make it much clearer. > > > > 3. To solve this problem, we can break down the RLMTask into two smaller > > tasks - one for segment upload and the other for handling expired > segments. > > How do we handle the situation when a segment is still waiting for > > offloading while this segment is expired and eligible to be deleted? > > Maybe it'll be easier to not block the RLMTask when quota exceeded, and > > just check it each time the RLMTask runs? > > > > Thank you. > > Luke > > > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar < > abhijeet.cse@gmail.com > > > > > wrote: > > > > > Hi All, > > > > > > I have created KIP-956 for defining read and write quota for tiered > > > storage. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > > > > > Feedback and suggestions are welcome. > > > > > > Regards, > > > Abhijeet. > > > > > >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi, Abhijeet, Thanks for the KIP. A few comments. 10. remote.log.manager.write.quota.default: 10.1 For other configs, we use replica.alter.log.dirs.io.max.bytes.per.second. To be consistent, perhaps this can be sth like remote.log.manager.write.max.bytes.per.second. 10.2 Could we list the new metrics associated with the new quota. 10.3 Is this dynamically configurable? If so, could we document the impact to tools like kafka-configs.sh and AdminClient? Jun On Tue, Nov 28, 2023 at 2:19 AM Luke Chen wrote: > Hi Abhijeet, > > Thanks for the KIP! > This is an important feature for tiered storage. > > Some comments: > 1. Will we introduce new metrics for this tiered storage quotas? > This is important because the admin can know the throttling status by > checking the metrics while the remote write/read are slow, like the rate of > uploading/reading byte rate, the throttled time for upload/read... etc. > > 2. Could you give some examples for the throttling algorithm in the KIP to > explain it? That will make it much clearer. > > 3. To solve this problem, we can break down the RLMTask into two smaller > tasks - one for segment upload and the other for handling expired segments. > How do we handle the situation when a segment is still waiting for > offloading while this segment is expired and eligible to be deleted? > Maybe it'll be easier to not block the RLMTask when quota exceeded, and > just check it each time the RLMTask runs? > > Thank you. > Luke > > On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar > > wrote: > > > Hi All, > > > > I have created KIP-956 for defining read and write quota for tiered > > storage. > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > > > Feedback and suggestions are welcome. > > > > Regards, > > Abhijeet. > > >
Re: [DISCUSS] KIP-956: Tiered Storage Quotas
Hi Abhijeet, Thanks for the KIP! This is an important feature for tiered storage. Some comments: 1. Will we introduce new metrics for this tiered storage quotas? This is important because the admin can know the throttling status by checking the metrics while the remote write/read are slow, like the rate of uploading/reading byte rate, the throttled time for upload/read... etc. 2. Could you give some examples for the throttling algorithm in the KIP to explain it? That will make it much clearer. 3. To solve this problem, we can break down the RLMTask into two smaller tasks - one for segment upload and the other for handling expired segments. How do we handle the situation when a segment is still waiting for offloading while this segment is expired and eligible to be deleted? Maybe it'll be easier to not block the RLMTask when quota exceeded, and just check it each time the RLMTask runs? Thank you. Luke On Wed, Nov 22, 2023 at 6:27 PM Abhijeet Kumar wrote: > Hi All, > > I have created KIP-956 for defining read and write quota for tiered > storage. > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas > > Feedback and suggestions are welcome. > > Regards, > Abhijeet. >
[DISCUSS] KIP-956: Tiered Storage Quotas
Hi All, I have created KIP-956 for defining read and write quota for tiered storage. https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas Feedback and suggestions are welcome. Regards, Abhijeet.