Agree that reusing correlation id won't work well. I want to point out that the issue is larger than just unsupported request types. Any error that happens before we figure out the request leads to silently closing the connection. Requests larger than socket.request.max.bytes, or IPs with more connections than quota will suffer the same fate.
I think that adding a protocol-level success/fail flag, independent of errors in the underlying functionality makes a lot of sense. Although perhaps a bit too late to do it now. Gwen On Fri, Jan 16, 2015 at 9:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > Yeah I second what Magnus is saying, reusing correlation id to mean > something else isn't the right thing. This would actually break the Java > clients as well which are very pedantic about getting the right correlation > in each response. > > -Jay > > On Fri, Jan 16, 2015 at 2:34 AM, Magnus Edenhill <mag...@edenhill.se> wrote: > >> It is a bit hacky, with multiple requests in transit on the wire how do >> you know which request wasn't supported if the returned correlationId is >> -1? >> Requests are currently responded to in the order they were sent, so the >> corrId >> is not yet critical, but with will this always be true? One could imagine >> that status requests such as Metadata, pings, etc, will be handled during >> the >> lifetime of a previous but not yet responded long running request, such as >> Fetch, Produce,.. >> >> >> I think the least intrusive method would be to return an empty message >> with just the ResponseHeader (with proper CorrId). >> This will lead to a parse error on the client, immediately (albeit >> indirectly) >> pointing out to the application/user that there is a protocol compatibility >> error, >> >> Updated clients will handle this gracefully as an UnsupportedRequest error >> internally >> while old clients fail in arbitrary, but most likely properly handled, ways >> (data shortage while parsing). >> >> >> My two cents, >> Magnus >> >> >> 2015-01-16 3:30 GMT+01:00 Guozhang Wang <wangg...@gmail.com>: >> >> > The "hacky" method that Dana suggests does not sound too hacky to me >> > actually. >> > >> > Since such scenario will only happen when 1) new clients talk to older >> > server and 2) older clients talk to new server with some APIs deprecated, >> > and "correlation_id" is always set to meaningful numbers before, old >> > clients will not check for its validity. So we only need to upgrade the >> > clients once for handling "-1 correlation_id" once at ANY time, while >> > before that happens the old client will just throw >> "SerializationException" >> > instead of "ERROR: Closing socket for" for both cases, which gives them >> > similar semantics. For such situation we do not need to require version >> > bump. >> > >> > On Mon, Jan 12, 2015 at 6:36 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> > >> > > I totally agree but I still think we shouldn't do it. :-) >> > > >> > > That change would cause the reimplementation of ALL existing Kafka >> > clients. >> > > (You can't chose not to implement a new protocol version or else we are >> > > committing to keeping the old version supported both ways on the server >> > > forever). >> > > >> > > The problem it fixes is fairly minor: clients that want to adaptively >> > > detect apis. In general I agree this isn't easy to do, but I also don't >> > > think it is really recommended. I think it is probably better for >> clients >> > > to just implement against reasonably conservative versions and trust us >> > not >> > > to break them going forward. That is simpler and less likely to break. >> > > >> > > We also haven't actually addressed the issue originally brought up that >> > > lead to not doing it--how to interpret and set the top-level error in >> the >> > > presence of nested errors (which exception does the client throw and >> > when). >> > > This is kind of icky to, though probably preferable if we were starting >> > > over. I see either of these alternatives as imperfect but changing now >> > has >> > > a high cost and doesn't really address a top 50 pain point. >> > > >> > > But I do agree that KIPs would really help draw attention to these >> kinds >> > of >> > > decisions as we make them and help us get serious about sticking with >> > them >> > > without having that kind of "it sucks but..." feeling. >> > > >> > > -Jay >> > > >> > > >> > > On Mon, Jan 12, 2015 at 5:57 PM, Joe Stein <joe.st...@stealth.ly> >> wrote: >> > > >> > > > There are benefits of moving the error code to the response header. >> > > > >> > > > 1) I think it is the right thing to-do from an implementation >> > > perspective. >> > > > It makes the most sense. You send a request and you get back a >> > response. >> > > > The response tells you something is wrong in the header. >> > > > >> > > > 2) With such a large change we can make sure we have our solution to >> > > solve >> > > > these issues (see other thread on Compatibility and KIP) setup and in >> > > place >> > > > moving forward. If we can make such a large change then smaller ones >> > > should >> > > > work well too. We could even use this one change as a way to best >> flush >> > > out >> > > > the way we want to implement it preserving functionality AND adding >> the >> > > new >> > > > response format. When we release 0.8.3 (assuming this was in there) >> > > > developers can read KIP-1 (or whatever) and decide if they want to >> > > support >> > > > the version bump required, if not then fine keep working with 0.8.2 >> and >> > > you >> > > > are good to go. >> > > > >> > > > /******************************************* >> > > > Joe Stein >> > > > Founder, Principal Consultant >> > > > Big Data Open Source Security LLC >> > > > http://www.stealth.ly >> > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> > > > ********************************************/ >> > > > >> > > > On Mon, Jan 12, 2015 at 8:37 PM, Jay Kreps <jay.kr...@gmail.com> >> > wrote: >> > > > >> > > > > Yeah, adding it to the metadata request probably makes sense. >> > > > > >> > > > > What you describe of making it a per-broker field is technically >> > > correct, >> > > > > since each broker could be on a different software version. But I >> > > wonder >> > > > if >> > > > > it might not be more usable to just give back a single list of api >> > > > > versions. This will be more compact and also easier to interpret >> as a >> > > > > client. An easy implementation of this would be for the broker that >> > > > answers >> > > > > the metadata request by just giving whatever versions it supports. >> A >> > > > > slightly better implementation would be for each broker to register >> > > what >> > > > it >> > > > > supports in ZK and have the responding broker give back the >> > > intersection >> > > > > (i.e. apis supported by all brokers). >> > > > > >> > > > > Since the broker actually supports multiple versions at the same >> time >> > > > this >> > > > > will need to be in the form [ApiId [ApiVersion]]. >> > > > > >> > > > > -Jay >> > > > > >> > > > > On Mon, Jan 12, 2015 at 5:19 PM, Dana Powers <dana.pow...@rd.io> >> > > wrote: >> > > > > >> > > > > > Perhaps a bit hacky, but you could also reserve a specific >> > > > correlationId >> > > > > > (maybe -1) >> > > > > > to represent errors and send back to the client an >> > UnknownAPIResponse >> > > > > like: >> > > > > > >> > > > > > Response => -1 UnknownAPIResponse >> > > > > > >> > > > > > UnknownAPIResponse => originalCorrelationId errorCode >> > > > > > >> > > > > > The benefit here would be that it does not break the current API >> > and >> > > > > > current >> > > > > > clients should be able to continue operating as usual as long as >> > they >> > > > > > ignore >> > > > > > unknown correlationIds and don't use the reserved Id. For >> clients >> > > that >> > > > > > want to >> > > > > > catch unknownAPI errors, they can handle -1 correlationIds and >> > > dispatch >> > > > > as >> > > > > > needed. >> > > > > > >> > > > > > Otherwise perhaps bump the Metadata Request / Response API and >> > > include >> > > > > > the supported api / versions in the Broker metadata: >> > > > > > >> > > > > > Broker => NodeId Host Port [ApiKey ApiVersion] (any number of >> > brokers >> > > > may >> > > > > > be returned) >> > > > > > NodeId => int32 >> > > > > > Host => string >> > > > > > Port => int32 >> > > > > > ApiKey => int16 >> > > > > > ApiVersion => int16 >> > > > > > >> > > > > > So that Metadata includes the list of all supported API/Versions >> > for >> > > > each >> > > > > > broker >> > > > > > in the cluster. >> > > > > > >> > > > > > And echo the problem with continuing with the current behavior >> > > pointed >> > > > > out >> > > > > > by Jay: >> > > > > > clients cannot know the difference between a network error and an >> > > > unknown >> > > > > > API >> > > > > > error. And because network errors require a lot of state resets, >> > > that >> > > > > can >> > > > > > be a >> > > > > > big performance hit. Generally on a network error a client needs >> > to >> > > > > assume >> > > > > > the >> > > > > > worst and reload cluster metadata at least. And it is difficult >> to >> > > > > prevent >> > > > > > this happening >> > > > > > every time because the client doesn't know whether to avoid the >> API >> > > in >> > > > > the >> > > > > > future because it is not supported, or keep retrying because the >> > > > network >> > > > > is >> > > > > > flaking. >> > > > > > >> > > > > > >> > > > > > Dana Powers >> > > > > > Rdio, Inc. >> > > > > > dana.pow...@rd.io >> > > > > > rdio.com/people/dpkp/ >> > > > > > >> > > > > > On Mon, Jan 12, 2015 at 3:51 PM, Jay Kreps <jay.kr...@gmail.com> >> > > > wrote: >> > > > > > >> > > > > > > Yeah I totally agree--using the closed socket to indicate "not >> > > > > supported" >> > > > > > > does work since any network error could lead to that. >> > > > > > > >> > > > > > > Arguably we should have a request level error. We discussed >> this >> > at >> > > > the >> > > > > > > time we were defining the protocols for 0.8 and the conclusion >> > was >> > > > not >> > > > > to >> > > > > > > do that. The reasoning was that since almost all the requests >> end >> > > up >> > > > > > having >> > > > > > > errors at either a per-topic or per-partition level this makes >> > > > > correctly >> > > > > > > setting/interpreting the global error a bit confusing. I.e. if >> > you >> > > > are >> > > > > > > implementing a client and a given partition gets an error but >> > there >> > > > is >> > > > > no >> > > > > > > global error, what do you do? Likewise in most cases it is a >> bit >> > > > > > ambiguous >> > > > > > > how to set the global error on the server side (i.e. if some >> > > > partitions >> > > > > > are >> > > > > > > unavailable but some are available). The result was that error >> > > > > reporting >> > > > > > is >> > > > > > > defined per-request. >> > > > > > > >> > > > > > > We could change this now, but it would mean bumping >> compatibility >> > > on >> > > > > all >> > > > > > > the apis to add the new field which would be annoying to >> people, >> > > > > right? I >> > > > > > > actually agree it might have been better to do it this way both >> > for >> > > > > this >> > > > > > > and also to make generic error handling easier but I'm not sure >> > if >> > > it >> > > > > is >> > > > > > > worth such a big break now. The other proposal, introducing a >> > > > > > > get_protocol_versions() method seems almost as good for probing >> > for >> > > > > > support >> > > > > > > and is much less invasive. That seems better to me because I >> > think >> > > > > > > generally clients shouldn't need to do this, they should just >> > build >> > > > > > against >> > > > > > > a minimum Kafka version and trust it will keep working into the >> > > > future. >> > > > > > > >> > > > > > > -Jay >> > > > > > > >> > > > > > > On Mon, Jan 12, 2015 at 2:24 PM, Gwen Shapira < >> > > gshap...@cloudera.com >> > > > > >> > > > > > > wrote: >> > > > > > > >> > > > > > > > > I think #1 may be tricky in practice. The response format >> is >> > > > always >> > > > > > > > > dictated by the request format so how do I know if the >> bytes >> > I >> > > > got >> > > > > > back >> > > > > > > > are >> > > > > > > > > a valid response to the given request or are the >> > > > > > > UnknownRequestResponse? >> > > > > > > > >> > > > > > > > On the other hand, from the client developer perspective, >> > having >> > > to >> > > > > > > > figure out that you are looking at a closed socket because >> you >> > > > tried >> > > > > > > > to use an API that wasn't implemented in a specific version >> can >> > > be >> > > > > > > > pretty annoying. >> > > > > > > > >> > > > > > > > Another way to do it is to move error_code field (currently >> > > > > > > > implemented in pretty much every single Response schema) to >> the >> > > > > > > > Response Header, and then we could use it for "meta errors" >> > such >> > > as >> > > > > > > > UnknownAPI. >> > > > > > > > >> > > > > > > > Its a much bigger change than adding a new Request type, but >> > > > possibly >> > > > > > > > worth it? >> > > > > > > > >> > > > > > > > > >> > > > > > > > > #2 would be a good fix for the problem I think. This might >> > be a >> > > > > good >> > > > > > > > > replacement for the echo api and would probably serve the >> > same >> > > > > > purpose >> > > > > > > > > (checking if the server is alive). >> > > > > > > > > >> > > > > > > > > #3 is a little dangerous because we actually want clients >> to >> > > only >> > > > > pay >> > > > > > > > > attention to the protocol versions which are per-api, not >> the >> > > > > server >> > > > > > > > > version. I.e. we actually don't want the client to do >> > something >> > > > > like >> > > > > > > > check >> > > > > > > > > serverVersion.equals("0.8.2") because we want to be able to >> > > > release >> > > > > > the >> > > > > > > > > server at will and have it keep answering protocols in a >> > > > backwards >> > > > > > > > > compatible way. I.e. a client that uses just metadata >> request >> > > and >> > > > > > > produce >> > > > > > > > > request should only care about the version of these >> protocols >> > > it >> > > > > > > > implements >> > > > > > > > > being supported not about the version of the server or the >> > > > version >> > > > > of >> > > > > > > any >> > > > > > > > > other protocol it doesn't use. This is the rationale behind >> > > > > > versioning >> > > > > > > > the >> > > > > > > > > apis independently rather than having a single protocol >> > version >> > > > > that >> > > > > > we >> > > > > > > > > would have to bump every time an internal broker-broker >> > > protocol >> > > > > > > changed. >> > > > > > > > > >> > > > > > > > > -Jay >> > > > > > > > > >> > > > > > > > > On Mon, Jan 12, 2015 at 1:32 PM, Gwen Shapira < >> > > > > gshap...@cloudera.com >> > > > > > > >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > >> We ran into similar difficulties, both when trying to get >> > > Kafka >> > > > to >> > > > > > use >> > > > > > > > >> new APIs when available and when testing the wire >> protocol. >> > > > > > > > >> >> > > > > > > > >> +1 for all three suggestions. >> > > > > > > > >> >> > > > > > > > >> #1 sounds like the bare minimum, but I'm not sure how much >> > it >> > > > will >> > > > > > > > >> complicate the clients (now we expect either a response or >> > an >> > > > > > Unknown >> > > > > > > > >> message and need to be able to distinguish between them >> from >> > > the >> > > > > > byte >> > > > > > > > >> array). >> > > > > > > > >> >> > > > > > > > >> #2 and #3 both makes lots of sense. >> > > > > > > > >> >> > > > > > > > >> Gwen >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> On Mon, Jan 12, 2015 at 1:15 PM, Dana Powers < >> > > dana.pow...@rd.io >> > > > > >> > > > > > > wrote: >> > > > > > > > >> > Hi all -- continuing on the compatibility discussion: >> > > > > > > > >> > >> > > > > > > > >> > I've found that it is very difficult to identify when a >> > > server >> > > > > > does >> > > > > > > > not >> > > > > > > > >> > recognize an api >> > > > > > > > >> > (I'm using kafka-python to submit wire-protocol >> requests). >> > > > For >> > > > > > > > example, >> > > > > > > > >> > when I >> > > > > > > > >> > send a ConsumerMetadataRequest to an 0.8.1.1 server, I >> > get a >> > > > > > closed >> > > > > > > > >> socket >> > > > > > > > >> > *[stacktrace below]. The server raises an error >> > internally, >> > > > but >> > > > > > > does >> > > > > > > > not >> > > > > > > > >> > send any >> > > > > > > > >> > meaningful response. I'm not sure whether this is the >> > > > intended >> > > > > > > > behavior, >> > > > > > > > >> > but >> > > > > > > > >> > maintaining clients in an ecosystem of multiple server >> > > > versions >> > > > > > with >> > > > > > > > >> > different API >> > > > > > > > >> > support it would be great to have a way to determine >> what >> > > the >> > > > > > server >> > > > > > > > >> > supports >> > > > > > > > >> > and what it does not. >> > > > > > > > >> > >> > > > > > > > >> > Some suggestions: >> > > > > > > > >> > >> > > > > > > > >> > (1) An UnknownAPIResponse that is returned for any API >> or >> > > API >> > > > > > > Version >> > > > > > > > >> > request >> > > > > > > > >> > that is unsupported. >> > > > > > > > >> > >> > > > > > > > >> > (2) A server metadata API to get the list of supported >> > APIs >> > > > > and/or >> > > > > > > API >> > > > > > > > >> > versions supported. >> > > > > > > > >> > >> > > > > > > > >> > (3) A server metadata API to get the published version >> of >> > > the >> > > > > > server >> > > > > > > > >> (0.8.2 >> > > > > > > > >> > v. 0.8.1.1, etc). >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > Thoughts? >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > >> > Dana Powers >> > > > > > > > >> > Rdio, Inc. >> > > > > > > > >> > dana.pow...@rd.io >> > > > > > > > >> > rdio.com/people/dpkp/ >> > > > > > > > >> > >> > > > > > > > >> > *stacktrace: >> > > > > > > > >> > ``` >> > > > > > > > >> > [2015-01-12 13:03:55,719] ERROR Closing socket for / >> > > 127.0.0.1 >> > > > > > > > because of >> > > > > > > > >> > error (kafka.network.Processor) >> > > > > > > > >> > kafka.common.KafkaException: Wrong request type 10 >> > > > > > > > >> > at >> > > > > > > kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:57) >> > > > > > > > >> > at >> > > > > > > > >> >> > > > > >> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:53) >> > > > > > > > >> > at >> kafka.network.Processor.read(SocketServer.scala:353) >> > > > > > > > >> > at >> kafka.network.Processor.run(SocketServer.scala:245) >> > > > > > > > >> > at java.lang.Thread.run(Thread.java:722) >> > > > > > > > >> > ``` >> > > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >>