Hey Todd,

Yeah it is kind of weird to do the quota check after taking a request, but
since the penalty is applied during that request and it just delays you to
the right rate, I think it isn't exactly wrong. I admit it is weird, though.

What you say about closing the connection makes sense. The issue is that
our current model for connections is totally transient. The clients are
supposed to handle any kind of transient connection loss and just
re-establish. So basically all existing clients would likely just retry all
the same whether you closed the connection or not, so at the moment there
would be no way to know a retried request is actually a retry.

Your point about the REST proxy is a good one, I don't think we had
considered that. Currently the java producer just has a single client.id
for all requests so the rest proxy would be a single client. But actually
what you want is the original sender to be the client. This is technically
very hard to do because the client will actually be batching records from
all senders together into one request so the only way to get the client id
right would be to make a new producer for each rest proxy client and this
would mean a lot of memory and connections. This needs thought, not sure
what solution there is.

I am not 100% convinced we need to obey the request timeout. The
configuration issue actually isn't a problem because the request timeout is
sent with the request so the broker actually knows it now even without a
handshake. However the question is, if someone sets a pathologically low
request timeout do we need to obey it? and if so won't that mean we can't
quota them? I claim the answer is no! I think we should redefine request
timeout to mean "replication timeout", which is actually what it is today.
Even today if you interact with a slow server it may take longer than that
timeout (say because the fs write queues up for a long-ass time). I think
we need a separate client timeout which should be fairly long and unlikely
to be hit (default to 30 secs or something).

-Jay

On Tue, Mar 10, 2015 at 10:12 AM, Todd Palino <tpal...@gmail.com> wrote:

> Thanks, Jay. On the interface, I agree with Aditya (and you, I believe)
> that we don't need to expose the public API contract at this time, but
> structuring the internal logic to allow for it later with low cost is a
> good idea.
>
> Glad you explained the thoughts on where to hold requests. While my gut
> reaction is to not like processing a produce request that is over quota, it
> makes sense to do it that way if you are going to have your quota action be
> a delay.
>
> On the delay, I see your point on the bootstrap cases. However, one of the
> places I differ, and part of the reason that I prefer the error, is that I
> would never allow a producer who is over quota to resend a produce request.
> A producer should identify itself at the start of it's connection, and at
> that point if it is over quota, the broker would return an error and close
> the connection. The same goes for a consumer. I'm a fan, in general, of
> pushing all error cases and handling down to the client and doing as little
> special work to accommodate those cases on the broker side as possible.
>
> A case to consider here is what does this mean for REST endpoints to Kafka?
> Are you going to hold the HTTP connection open as well? Is the endpoint
> going to queue and hold requests?
>
> I think the point that we can only delay as long as the producer's timeout
> is a valid one, especially given that we do not have any means for the
> broker and client to negotiate settings, whether that is timeouts or
> message sizes or anything else. There are a lot of things that you have to
> know when setting up a Kafka client about what your settings should be,
> when much of that should be provided for in the protocol handshake. It's
> not as critical in an environment like ours, where we have central
> configuration for most clients, but we still see issues with it. I think
> being able to have the client and broker negotiate a minimum timeout
> allowed would make the delay more palatable.
>
> I'm still not sure it's the right solution, and that we're not just going
> with what's fast and cheap as opposed to what is good (or right). But given
> the details of where to hold the request, I have less of a concern with the
> burden on the broker.
>
> -Todd
>
>
> On Mon, Mar 9, 2015 at 5:01 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Hey Todd,
> >
> > Nice points, let me try to respond:
> >
> > Plugins
> >
> > Yeah let me explain what I mean about plugins. The contracts that matter
> > for us are public contracts, i.e. the protocol, the binary format, stuff
> in
> > zk, and the various plug-in apis we expose. Adding an internal interface
> > later will not be hard--the quota check is going to be done in 2-6 places
> > in the code which would need to be updated, all internal to the broker.
> >
> > The challenge with making things pluggable up front is that the policy is
> > usually fairly trivial to plug in but each policy requires different
> > inputs--the number of partitions, different metrics, etc. Once we give a
> > public api it is hard to add to it without breaking the original contract
> > and hence breaking everyones plugins. So if we do this we want to get it
> > right early if possible. In any case I think whether we want to design a
> > pluggable api or just improve a single implementation, the work we need
> to
> > do is the same: brainstorm the set of use cases the feature has and then
> > figure out the gap in our proposed implementation that leaves some use
> case
> > uncovered. Once we have these specific cases we can try to figure out if
> > that could be solved with a plugin or by improving our default proposal.
> >
> > Enforcement
> >
> > I started out arguing your side (immediate error), but I have switched to
> > preferring delay. Here is what convinced me, let's see if it moves you.
> >
> > First, the delay quota need not hold onto any request data. The produce
> > response can be delayed after the request is completed and the fetch can
> be
> > delayed prior to the fetch being executed. So no state needs to be
> > maintained in memory, other than a single per-connection token. This is a
> > really important implementation detail for large scale usage that I
> didn't
> > realize at first. I would agree that maintaining a request per connection
> > in memory is a non-starter for an environment with 10s of thousands of
> > connections.
> >
> > The second argument is that I think this really expands the use cases
> where
> > the quotas can be applicable.
> >
> > The use case I have heard people talk about is event collection from
> apps.
> > In this use case the app is directly sending data at a more or less
> steady
> > state and never really has a performance spike unless the app has a bug
> or
> > the application itself experiences more traffic. So in this case you
> should
> > actually never hit the quota, and if you do, the data is going to be
> > dropped wither it is dropped by the server with an error or by the
> client.
> > These use cases will never block the app (which would be dangerous) since
> > the client is always non-blocking and drops data when it's buffer is full
> > rather than blocking. I agree that for this use case either server-side
> > delay or client side delay are both reasonable--the pro of a server-side
> > delay is that it doesn't require special client handling, the pro of the
> > server-side error is that it is more transparent.
> >
> > But now consider non-steady-state use cases. Here I am thinking of:
> > 1. Data load from Hadoop
> > 2. MM load into a cluster with live usage
> > 3. Database changelog capture
> > 4. Samza
> >
> > Each of these has a case where it is "catching up" or otherwise slammed
> by
> > load from the source system:
> > 1. A M/R job dump a ton of data all at once
> > 2. MM when catching up after some downtime
> > 3. Database changelog will have a load phase when populating data for the
> > first time
> > 4. Samza when restoring state or catching up after fail-over
> >
> > In each of these cases you want the consumer or producer to go as fast as
> > possible but not impact the other users of the cluster. In these cases
> you
> > are actually using the quotas totally differently. In the app event
> capture
> > use case the quota was more like a safety valve that you expected to
> never
> > hit. However in these cases I just listed you fully expect to hit and
> > remain at the quota for extended periods of time and that will be totally
> > normal.
> >
> > These are the cases where throttling throughput is better than sending an
> > error. If we send an error then any producer who doesn't configure enough
> > retries is going to start losing data. Further even if you set infinite
> > retries the retry itself is going to mean resending the data over and
> over
> > until you get a non-error. This is bad because in a period of high load
> you
> > are then going to be incurring more network load as lots of producers
> start
> > retrying (this isn't a problem on the consumer because the fetch request
> is
> > small but is an issue on the producer).
> >
> > I take your point about the potential danger of slowing down a producing
> > app that is configured to block. But actually this danger is no different
> > than what will happen if it exceeds the node capacity now--when that
> > happens requests will start getting slow and the app will block. The only
> > difference is that that limit is now lower than when the node's capacity
> is
> > totally exhausted. So I don't think that is a new danger.
> >
> > Exposing quota usage
> >
> > I agree we should make this available, good use of this feature obviously
> > means knowing how close you are to your quota before you hit it.
> >
> > Cheers,
> >
> > -Jay
> >
> > On Mon, Mar 9, 2015 at 10:34 AM, Todd Palino <tpal...@gmail.com> wrote:
> >
> > > First, a couple notes on this...
> > >
> > > 3 - I generally agree with the direction of not pre-optimizing.
> However,
> > in
> > > this case I'm concerned about the calculation of the cost of doing
> > plugins
> > > now vs. trying to refactor the code to do it later. It would seem to me
> > > that doing it up front will have less friction. If we wait to do
> plugins
> > > later, it will probably mean changing a lot of related code which will
> be
> > > significantly more work. We've spent a lot of time talking about
> various
> > > implementations, and I think it not unreasonable to believe that what
> one
> > > group wants initially is not going to solve even most cases, as it will
> > > vary by use case.
> > >
> > > 4 - I really disagree with this. Slowing down a request means that
> you're
> > > going to hold onto it in the broker. This takes up resources and time,
> > and
> > > is generally not the way other services handle quota violations. In
> > > addition you are causing potential problems with the clients by taking
> a
> > > call that's supposed to return as quickly as possible and making it
> take
> > a
> > > long time. This increases latency and deprives the client of the
> ability
> > to
> > > make good decisions about what to do. By sending an error back to the
> > > client you inform them of what the problem is, and you allow the client
> > to
> > > make an intelligent decision, such as queuing to send later, sending to
> > > another resource, or handling anything from their upstreams
> differently.
> > >
> > > You're absolutely right that throwing back an immediate error has the
> > > potential to turn a quota violation into a different problem for a
> badly
> > > behaved client. But OS and upstream networking tools can see a problem
> > > based on a layer 4 issue (rapidly reconnecting client) rather than
> layers
> > > above. Out of the options provided, I think A is the correct choice. B
> > > seems to be the most work (you have the delay, and the client still has
> > to
> > > handle errors and backoff), and C is what I disagree with doing.
> > >
> > > I would also like to see a provision for allowing the client to query
> its
> > > quota status within the protocol. I think we should allow for a request
> > (or
> > > information within an existing response) where the client can ask what
> > its
> > > current quota status is. This will allow for the clients to manage
> their
> > > quotas, and it will allow for emitting metrics on the client side for
> > quota
> > > status (rather than relying on the server-side metrics, which tends to
> > put
> > > the responsibility in the wrong place).
> > >
> > >
> > > -Todd
> > >
> > >
> > > On Sun, Mar 8, 2015 at 10:52 AM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > Hey Adi,
> > > >
> > > > Great write-up. Here are some comments:
> > > >
> > > > 1. I don't think you need a way to disable quotas on a per-client
> > basis,
> > > > that is just the equivalent of setting the quota to be infinite,
> right?
> > > >
> > > > 2. I agree that the configuration problem is a general part of doing
> > > > dynamic configuration, and it is right to separate that into the
> config
> > > > KIP. But Joe's proposal currently doesn't provide nearly what you
> need
> > in
> > > > its current form--it doesn't even handle client-id based
> configuration,
> > > let
> > > > alone the notification mechanism you would need to update your
> > quota--so
> > > we
> > > > really need to give completely explicitly how that KIP is going to
> > solve
> > > > this problem.
> > > >
> > > > 3. Custom quota implementations: let's do this later. Pluggability
> > comes
> > > > with a high cost and we want to try really hard to avoid it. So in
> the
> > > > future if we have a really solid case for an alternative quota
> approach
> > > > let's see if we can't improve the current approach and stick with one
> > > good
> > > > implementation. If we really can't then let's add a plugin system. I
> > > think
> > > > doing it now is premature.
> > > >
> > > > 4. I think the ideal quota action from the users point of view is
> just
> > to
> > > > slow down the writer or reader transparently to match their capacity
> > > > allocation. Let's try to see if we can make that work.
> > > >
> > > > I think immediate error can be ruled out entirely because it depends
> on
> > > the
> > > > client properly backing off. In cases where they don't we may
> actually
> > > make
> > > > things worse. Given the diversity of clients I think this is probably
> > not
> > > > going to happen.
> > > >
> > > > The only downside to just delaying the request that was pointed out
> was
> > > > that if the delay exceeded the request timeout the user might retry.
> > This
> > > > is true but it applies to any approach that delays requests (both B
> and
> > > C).
> > > > I think with any sane request timeout and quota the per request delay
> > we
> > > > induce will be way lower (otherwise you would be hitting the timeout
> > all
> > > > the time just due to linux I/O variance, in which case you can't
> really
> > > > complain).
> > > >
> > > > 5. We need to explain the relationship between the quota stuff in the
> > > > metrics package and this. We need to either remove that stuff or use
> > it.
> > > We
> > > > can't have two quota things. Since quota fundamentally apply to
> > windowed
> > > > metrics, I would suggest doing whatever improvements to that to make
> it
> > > > usable for quotas.
> > > >
> > > > 6. I don't think the quota manager interface is really what we need
> if
> > > I'm
> > > > understanding it correctly. You give a method
> > > >   <T extends RequestOrResponse> boolean check(T request);
> > > > But how would you implement this method? It seems like it would
> > basically
> > > > internally just be a switch statement with a different check for each
> > > > request type. So this is a pretty terrible object oriented api,
> right?
> > It
> > > > seems like what we will be doing is taking code that would otherwise
> > just
> > > > be in the request handling flow, and moving it into this method,
> with a
> > > > bunch of instanceof checks?
> > > >
> > > > I think what we need is just a delayqueue and a background thread
> that
> > > > sends the delayed responses (we were calling it a purgatory but it
> > isn't,
> > > > it is just a timeout based delay--there are no watchers or keys or
> any
> > of
> > > > that).
> > > >
> > > > Let's rename the QuotaManager RequestThrottler and have it just have
> a
> > > > single method:
> > > > class RequestThrottler {
> > > >   sendDelayedResponse(response, delay, timeunit)
> > > > }
> > > > internally it will put the response into the delay queue and there
> will
> > > be
> > > > a background thread that sends out those responses after the delay
> > > elapses.
> > > >
> > > > So usage in KafkaApis would look like:
> > > >    try {
> > > >      quotaMetric.record(newVal)
> > > >    } catch (QuotaException e) {
> > > >      requestThrottler.add(new DelayedResponse(...), ...)
> > > >  return
> > > >    }
> > > >
> > > > The advantage of this is that the logic of what metric is being
> checked
> > > and
> > > > the logic of how to appropriately correct the response, both of which
> > > will
> > > > be specific to each request, now remain in KafkaApis where they
> belong.
> > > The
> > > > throttler just delays the sending of the response for the appropriate
> > > time
> > > > and has no per-request logic whatsoever.
> > > >
> > > > 7. We need to think through and state the exact algorithm for how we
> > will
> > > > assign delays to requests for a use case that is over its quota. That
> > is
> > > > closely tied to how we calculate the metric used. Here would be a bad
> > > > approach we should not use:
> > > > a. measure in a 30 second window.
> > > > b. when we have hit the cap in that window, delay for the remainder
> of
> > > the
> > > > 30 seconds
> > > > As you can imagine with this bad algorithm you might then use all
> > server
> > > > resources for 5 seconds, then suddenly assign a 25 second delay to
> the
> > > next
> > > > request from that client, then the window would reset and this would
> > > > repeat.
> > > > The quota package is already doing a good job of the windowed
> metrics,
> > > but
> > > > we'll want to integrate the backoff calculation with that algorithm
> > > > (assuming that is what we are using).
> > > >
> > > > Cheers,
> > > >
> > > > -Jay
> > > >
> > > > On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
> > > > aaurad...@linkedin.com.invalid> wrote:
> > > >
> > > > > Posted a KIP for quotas in kafka.
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
> > > > >
> > > > > Appreciate any feedback.
> > > > >
> > > > > Aditya
> > > > >
> > > >
> > >
> >
>

Reply via email to