Jun,

Thanks for the explanation. I believe my understanding is close to what
you have written. I see, I still think that this approach is
somewhat limiting
(what if you add field of type int in V1 and then remove another field of
type int in V2 - method overloading for V0 and V2 constructors will not
compile) but in any case we need to follow this approach.

Ok, then I believe I will have to remove all "error"-constructors which
were
added as part of this sub-task. Instead in getErrorResponse(versionId,
throwable)
I will pattern-match on version and get the right response version
by calling the constructor with the right arguments.

Also one small issue with this approach. Currently we create
MetadataRequest from a Cluster object. As you remember in KIP-4 we
planned to evolve it to include topic-level configs. We agreed to add
this to Cluster class directly. In this case it will break our pattern -
constructor per version, since the constructor won't be changed (simply
accepting cluster instance in both cases).
What is the preferable solution in this case? I can explicitly add
topicConfigs
param to the signature of the V1 constructor but it seems inconsistent
because
Cluster would already encapsulate topicConfigs at that point.

Thanks,
Andrii Biletskyi

On Mon, May 18, 2015 at 8:28 PM, Jun Rao <j...@confluent.io> wrote:

> Andri,
>
> Let me clarify a bit how things work now. You can see if this fits your
> need or if it can be improved. If you look at OffsetCommitRequest, our
> convention is the following.
>
> 1. The request object can be constructed from a set of required fields. The
> client typically constructs a request object this way. There will be one
> constructor for each version. The version id is not specified explicitly
> since it's implied by the input parameters. Every time we introduce a new
> version, we will add a new constructor of this form. We will leave the old
> constructors as they are, but mark them as deprecated. Code compiled with
> the old Kafka jar will still work with the new Kafka jar before we actually
> remove the deprecated constructors.
>
> 2. The request object can also be constructed from a struct. This is
> typically used by the broker to convert network bytes into a request
> object. Currently, the constructor looks for specific fields in the struct
> to distinguish which version it corresponds to.
>
> 3. In both cases, the request object always tries to reflect the fields in
> the latest version. We use the following convention when mapping older
> versions to the latest version in the request object:  If a new field is
> added, we try to use a default for the missing field in the old version. If
> a field is removed, we simply ignore it in the old version.
>
> Thanks,
>
> Jun
>
> On Mon, May 18, 2015 at 8:41 AM, Andrii Biletskyi <
> andrii.bilets...@stealth.ly> wrote:
>
> > Hi all,
> >
> > I started working on it and it seems we are going the wrong way.
> > So it appears we need to distinguish constructors by versions in
> > request/response (so we can set correct schema).
> > Request/Response classes will look like:
> >
> > class SomeRequest extends AbstractRequest {
> >    SomeRequest(versionId, <request-specific params >)
> >
> >    // for the latest version
> >    SomeRequest(<request-specific params>)
> > }
> >
> > Now, what if in SomeRequest_V1 we remove some field from the schema?
> > Well, we can leave constructor signature and simply check
> programmatically
> > if set schema contains given field and if no simply ignore it. Thus
> > mentioned
> > constructor can support V0 & V1. Now, suppose in V2 we add some field -
> > there's nothing we can do, we need to add new parameter and thus add new
> > constructor:
> >    SomeRequest(versionId, <request-specific params for V2>)
> >
> > but it's a bit strange - to introduce constructors which may fail in
> > runtime-only
> > because you used the wrong constructor for your request version.
> > Overall in my opinion such approach depicts we are trying to give clients
> > factory-like
> > methods but implemented as class constructors...
> >
> > Another thing is about versionId-less constructor (used for the latest
> > version).
> > Again, suppose in V1 we extend schema with additional value, we will have
> > to change constructor without versionId, because this becomes the latest
> > version.
> > But would it be considered backward-compatible? Client code that uses V0
> > and
> > upgrades will not compile in this case.
> >
> > Thoughts?
> >
> > Thanks,
> > Andrii Biletskyi
> >
> >
> >
> >
> > On Fri, May 15, 2015 at 4:31 PM, Andrii Biletskyi <
> > andrii.bilets...@stealth.ly> wrote:
> >
> > > Okay,
> > > I can pick that. I'll create sub-task under KAFKA-2044.
> > >
> > > Thanks,
> > > Andrii Biletskyi
> > >
> > > On Fri, May 15, 2015 at 4:27 PM, Gwen Shapira <gshap...@cloudera.com>
> > > wrote:
> > >
> > >> Agree that you need version in getErrorResponse too (so you'll get the
> > >> correct error), which means you'll need to add versionId to
> constructors
> > >> of
> > >> every response object...
> > >>
> > >> You'll want to keep two interfaces, one with version and one with
> > >> CURR_VERSION as default, so you won't need to modify every single
> > call...
> > >>
> > >> On Fri, May 15, 2015 at 4:03 PM, Andrii Biletskyi <
> > >> andrii.bilets...@stealth.ly> wrote:
> > >>
> > >> > Correct, I think we are on the same page.
> > >> > This way we can fix RequestChannel part (where it uses
> > >> > AbstractRequest.getRequest)
> > >> >
> > >> > But would it be okay to add versionId to
> > >> AbstractRequest.getErrorResponse
> > >> > signature too?
> > >> > I'm a bit lost with all those Abstract... objects hierarchy and not
> > sure
> > >> > whether it's
> > >> > the right solution.
> > >> >
> > >> > Thanks,
> > >> > Andrii Biletskyi
> > >> >
> > >> > On Fri, May 15, 2015 at 3:47 PM, Gwen Shapira <
> gshap...@cloudera.com>
> > >> > wrote:
> > >> >
> > >> > > I agree, we currently don't handle versions correctly when
> > >> de-serializing
> > >> > > into java objects. This will be an isssue for every req/resp we
> move
> > >> to
> > >> > use
> > >> > > the java objects.
> > >> > >
> > >> > > It looks like this requires:
> > >> > > 1. Add versionId parameter to all parse functions in Java req/resp
> > >> > objects
> > >> > > 2. Modify getRequest to pass it along
> > >> > > 3. Modify RequestChannel to get the version out of the header and
> > use
> > >> it
> > >> > > when de-serializing the body.
> > >> > >
> > >> > > Did I get that correct? I want to make sure we are talking about
> the
> > >> same
> > >> > > issue.
> > >> > >
> > >> > > Gwen
> > >> > >
> > >> > > On Fri, May 15, 2015 at 1:45 PM, Andrii Biletskyi <
> > >> > > andrii.bilets...@stealth.ly> wrote:
> > >> > >
> > >> > > > Gwen,
> > >> > > >
> > >> > > > I didn't find this in answers above so apologies if this was
> > >> discussed.
> > >> > > > It's about the way we would like to handle request versions.
> > >> > > >
> > >> > > > As I understood from Jun's answer we generally should try using
> > the
> > >> > same
> > >> > > > java object while evolving the request. I believe the only
> example
> > >> of
> > >> > > > evolved
> > >> > > > request now - OffsetCommitRequest follows this approach.
> > >> > > >
> > >> > > > I'm trying to evolve MetadataRequest to the next version as part
> > of
> > >> > KIP-4
> > >> > > > and not sure current AbstractRequest api (which is a basis for
> > >> ported
> > >> > to
> > >> > > > java requests)
> > >> > > > is sufficient.
> > >> > > >
> > >> > > > The problem is: in order to deserialize bytes into correct
> correct
> > >> > object
> > >> > > > you need
> > >> > > > to know it's version. Suppose KafkaApi serves
> > OffsetCommitRequestV0
> > >> and
> > >> > > V2
> > >> > > > (current).
> > >> > > > For such cases OffsetCommitRequest class has two constructors:
> > >> > > >
> > >> > > > public static OffsetCommitRequest parse(ByteBuffer buffer, int
> > >> > versionId)
> > >> > > > AND
> > >> > > > public static OffsetCommitRequest parse(ByteBuffer buffer)
> > >> > > >
> > >> > > > The latter one will simply pick the "current" schema version.
> > >> > > > Now AbstractRequest.getRequest which is an entry point for
> > >> > deserializing
> > >> > > > request
> > >> > > > for KafkaApi matches only on RequestHeader.apiKey (and thus uses
> > the
> > >> > > second
> > >> > > > OffsetCommitRequest constructor) which is not sufficient because
> > we
> > >> > also
> > >> > > > need
> > >> > > > RequestHeader.apiVersion in case old request version.
> > >> > > >
> > >> > > > The same problem appears in
> > >> AbstractRequest.getErrorResponse(Throwable
> > >> > > e) -
> > >> > > > to construct the right error response object we need to know to
> > >> which
> > >> > > > apiVersion
> > >> > > > to respond.
> > >> > > >
> > >> > > > I think this can affect other tasks under KAFKA-1927 - replacing
> > >> > separate
> > >> > > > RQ/RP,
> > >> > > > so maybe it makes sense to decide/fix it once.
> > >> > > >
> > >> > > > Thanks,
> > >> > > > Andrii Bieltskyi
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Wed, Mar 25, 2015 at 12:42 AM, Gwen Shapira <
> > >> gshap...@cloudera.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > OK, I posted a working patch on KAFKA-2044 and
> > >> > > > > https://reviews.apache.org/r/32459/diff/.
> > >> > > > >
> > >> > > > > There are few decisions there than can be up to discussion
> > >> (factory
> > >> > > > method
> > >> > > > > on AbstractRequestResponse, the new handleErrors in request
> > API),
> > >> but
> > >> > > as
> > >> > > > > far as support for o.a.k.common requests in core goes, it does
> > >> what
> > >> > it
> > >> > > > > needs to do.
> > >> > > > >
> > >> > > > > Please review!
> > >> > > > >
> > >> > > > > Gwen
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > On Tue, Mar 24, 2015 at 10:59 AM, Gwen Shapira <
> > >> > gshap...@cloudera.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hi,
> > >> > > > > >
> > >> > > > > > I uploaded a (very) preliminary patch with my idea.
> > >> > > > > >
> > >> > > > > > One thing thats missing:
> > >> > > > > > RequestResponse had  handleError method that all requests
> > >> > > implemented,
> > >> > > > > > typically generating appropriate error Response for the
> > request
> > >> and
> > >> > > > > sending
> > >> > > > > > it along. Its used by KafkaApis to handle all protocol
> errors
> > >> for
> > >> > > valid
> > >> > > > > > requests that are not handled elsewhere.
> > >> > > > > > AbstractRequestResponse doesn't have such method.
> > >> > > > > >
> > >> > > > > > I can, of course, add it.
> > >> > > > > > But before I jump into this, I'm wondering if there was
> > another
> > >> > plan
> > >> > > on
> > >> > > > > > handling Api errors.
> > >> > > > > >
> > >> > > > > > Gwen
> > >> > > > > >
> > >> > > > > > On Mon, Mar 23, 2015 at 6:16 PM, Jun Rao <j...@confluent.io>
> > >> wrote:
> > >> > > > > >
> > >> > > > > >> I think what you are saying is that in RequestChannel, we
> can
> > >> > start
> > >> > > > > >> generating header/body for new request types and leave
> > >> requestObj
> > >> > > > null.
> > >> > > > > >> For
> > >> > > > > >> existing requests, header/body will be null initially.
> > >> Gradually,
> > >> > we
> > >> > > > can
> > >> > > > > >> migrate each type of requests by populating header/body,
> > >> instead
> > >> > of
> > >> > > > > >> requestObj. This makes sense to me since it serves two
> > purposes
> > >> > (1)
> > >> > > > not
> > >> > > > > >> polluting the code base with duplicated request/response
> > >> objects
> > >> > for
> > >> > > > new
> > >> > > > > >> types of requests and (2) allowing the refactoring of
> > existing
> > >> > > > requests
> > >> > > > > to
> > >> > > > > >> be done in smaller pieces.
> > >> > > > > >>
> > >> > > > > >> Could you try that approach and perhaps just migrate one
> > >> existing
> > >> > > > > request
> > >> > > > > >> type (e.g. HeartBeatRequest) as an example? We probably
> need
> > to
> > >> > > rewind
> > >> > > > > the
> > >> > > > > >> buffer after reading the requestId when deserializing the
> > >> header
> > >> > > > (since
> > >> > > > > >> the
> > >> > > > > >> header includes the request id).
> > >> > > > > >>
> > >> > > > > >> Thanks,
> > >> > > > > >>
> > >> > > > > >> Jun
> > >> > > > > >>
> > >> > > > > >> On Mon, Mar 23, 2015 at 4:52 PM, Gwen Shapira <
> > >> > > gshap...@cloudera.com>
> > >> > > > > >> wrote:
> > >> > > > > >>
> > >> > > > > >> > I'm thinking of a different approach, that will not fix
> > >> > > everything,
> > >> > > > > but
> > >> > > > > >> > will allow adding new requests without code duplication
> > (and
> > >> > > > therefore
> > >> > > > > >> > unblock KIP-4):
> > >> > > > > >> >
> > >> > > > > >> > RequestChannel.request currently takes a buffer and
> parses
> > it
> > >> > into
> > >> > > > an
> > >> > > > > >> "old"
> > >> > > > > >> > request object. Since the objects are byte-compatibly, we
> > >> should
> > >> > > be
> > >> > > > > >> able to
> > >> > > > > >> > parse existing requests into both old and new objects.
> New
> > >> > > requests
> > >> > > > > will
> > >> > > > > >> > only be parsed into new objects.
> > >> > > > > >> >
> > >> > > > > >> > Basically:
> > >> > > > > >> > val requestId = buffer.getShort()
> > >> > > > > >> > if (requestId in keyToNameAndDeserializerMap) {
> > >> > > > > >> >    requestObj =
> > >> > RequestKeys.deserializerForKey(requestId)(buffer)
> > >> > > > > >> >    header: RequestHeader = RequestHeader.parse(buffer)
> > >> > > > > >> >    body: Struct =
> > >> > > > > >> >
> > >> > > > > >>
> > >> > > > >
> > >> > >
> > >>
> > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
> > >> > > > > >> > } else {
> > >> > > > > >> >    requestObj = null
> > >> > > > > >> >     header: RequestHeader = RequestHeader.parse(buffer)
> > >> > > > > >> >    body: Struct =
> > >> > > > > >> >
> > >> > > > > >>
> > >> > > > >
> > >> > >
> > >>
> > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
> > >> > > > > >> > }
> > >> > > > > >> >
> > >> > > > > >> > This way existing KafkaApis will keep working as normal.
> > The
> > >> new
> > >> > > > Apis
> > >> > > > > >> can
> > >> > > > > >> > implement just the new header/body requests.
> > >> > > > > >> > We'll do the same on the send-side: BoundedByteBufferSend
> > can
> > >> > > have a
> > >> > > > > >> > constructor that takes header/body instead of just a
> > response
> > >> > > > object.
> > >> > > > > >> >
> > >> > > > > >> > Does that make sense?
> > >> > > > > >> >
> > >> > > > > >> > Once we have this in, we can move to:
> > >> > > > > >> > * Adding the missing request/response to the client code
> > >> > > > > >> > * Replacing requests that can be replaced
> > >> > > > > >> >
> > >> > > > > >> > It will also make life easier by having us review and
> tests
> > >> > > smaller
> > >> > > > > >> chunks
> > >> > > > > >> > of work (the existing patch is *huge* , touches nearly
> > every
> > >> > core
> > >> > > > > >> component
> > >> > > > > >> > and I'm not done yet...)
> > >> > > > > >> >
> > >> > > > > >> > Gwen
> > >> > > > > >> >
> > >> > > > > >> >
> > >> > > > > >> >
> > >> > > > > >> >
> > >> > > > > >> > On Sun, Mar 22, 2015 at 10:24 PM, Jay Kreps <
> > >> > jay.kr...@gmail.com>
> > >> > > > > >> wrote:
> > >> > > > > >> >
> > >> > > > > >> > > Ack, yeah, forgot about that.
> > >> > > > > >> > >
> > >> > > > > >> > > It's not just a difference of wrappers. The server side
> > >> > actually
> > >> > > > > sends
> > >> > > > > >> > the
> > >> > > > > >> > > bytes lazily using FileChannel.transferTo. We need to
> > make
> > >> it
> > >> > > > > >> possible to
> > >> > > > > >> > > carry over that optimization. In some sense what we
> want
> > >> to be
> > >> > > > able
> > >> > > > > >> to do
> > >> > > > > >> > > is set a value to a Send instead of a ByteBuffer.
> > >> > > > > >> > >
> > >> > > > > >> > > Let me try to add that support to the protocol
> definition
> > >> > stuff,
> > >> > > > > will
> > >> > > > > >> > > probably take me a few days to free up time.
> > >> > > > > >> > >
> > >> > > > > >> > > -Jay
> > >> > > > > >> > >
> > >> > > > > >> > > On Sun, Mar 22, 2015 at 7:44 PM, Gwen Shapira <
> > >> > > > > gshap...@cloudera.com>
> > >> > > > > >> > > wrote:
> > >> > > > > >> > >
> > >> > > > > >> > > > In case anyone is still following this thread, I
> need a
> > >> bit
> > >> > of
> > >> > > > > help
> > >> > > > > >> :)
> > >> > > > > >> > > >
> > >> > > > > >> > > > The old FetchResponse.PartitionData included a
> > MessageSet
> > >> > > > object.
> > >> > > > > >> > > > The new FetchResponse.PartitionData includes a
> > >> ByteBuffer.
> > >> > > > > >> > > >
> > >> > > > > >> > > > However, when we read from logs, we return a
> > MessageSet,
> > >> and
> > >> > > as
> > >> > > > > far
> > >> > > > > >> as
> > >> > > > > >> > I
> > >> > > > > >> > > > can see, these can't be converted to ByteBuffers (at
> > >> least
> > >> > not
> > >> > > > > >> without
> > >> > > > > >> > > > copying their data).
> > >> > > > > >> > > >
> > >> > > > > >> > > > Did anyone consider how to reconcile the MessageSets
> > with
> > >> > the
> > >> > > > new
> > >> > > > > >> > > > FetchResponse objects?
> > >> > > > > >> > > >
> > >> > > > > >> > > > Gwen
> > >> > > > > >> > > >
> > >> > > > > >> > > >
> > >> > > > > >> > > > On Sat, Mar 21, 2015 at 6:54 PM, Gwen Shapira <
> > >> > > > > >> gshap...@cloudera.com>
> > >> > > > > >> > > > wrote:
> > >> > > > > >> > > >
> > >> > > > > >> > > > > Note: I'm also treating ZkUtils as if it was a
> public
> > >> API
> > >> > > > (i.e.
> > >> > > > > >> > > > converting
> > >> > > > > >> > > > > objects that are returned into o.a.k.common
> > equivalents
> > >> > but
> > >> > > > not
> > >> > > > > >> > > changing
> > >> > > > > >> > > > > ZkUtils itself).
> > >> > > > > >> > > > > I know its not public, but I suspect I'm not the
> only
> > >> > > > developer
> > >> > > > > >> here
> > >> > > > > >> > > who
> > >> > > > > >> > > > > has tons of external code that uses it.
> > >> > > > > >> > > > >
> > >> > > > > >> > > > > Gwen
> > >> > > > > >> > > > >
> > >> > > > > >> > > > > On Wed, Mar 18, 2015 at 5:48 PM, Gwen Shapira <
> > >> > > > > >> gshap...@cloudera.com
> > >> > > > > >> > >
> > >> > > > > >> > > > > wrote:
> > >> > > > > >> > > > >
> > >> > > > > >> > > > >> We can't rip them out completely, unfortunately -
> > the
> > >> > > > > >> SimpleConsumer
> > >> > > > > >> > > > uses
> > >> > > > > >> > > > >> them.
> > >> > > > > >> > > > >>
> > >> > > > > >> > > > >> So we'll need conversion at some point. I'll try
> to
> > >> make
> > >> > > the
> > >> > > > > >> > > > >> conversion point "just before hitting a public API
> > >> that
> > >> > we
> > >> > > > > can't
> > >> > > > > >> > > > >> modify", and hopefully it won't look too
> arbitrary.
> > >> > > > > >> > > > >>
> > >> > > > > >> > > > >>
> > >> > > > > >> > > > >>
> > >> > > > > >> > > > >> On Wed, Mar 18, 2015 at 5:24 PM, Jay Kreps <
> > >> > > > > jay.kr...@gmail.com>
> > >> > > > > >> > > wrote:
> > >> > > > > >> > > > >> > I think either approach is okay in the short
> term.
> > >> > > However
> > >> > > > > our
> > >> > > > > >> > goal
> > >> > > > > >> > > > >> should
> > >> > > > > >> > > > >> > be to eventually get rid of that duplicate code,
> > so
> > >> if
> > >> > > you
> > >> > > > > are
> > >> > > > > >> up
> > >> > > > > >> > > for
> > >> > > > > >> > > > >> just
> > >> > > > > >> > > > >> > ripping and cutting that may get us there
> sooner.
> > >> > > > > >> > > > >> >
> > >> > > > > >> > > > >> > -Jay
> > >> > > > > >> > > > >> >
> > >> > > > > >> > > > >> > On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira <
> > >> > > > > >> > > gshap...@cloudera.com>
> > >> > > > > >> > > > >> wrote:
> > >> > > > > >> > > > >> >
> > >> > > > > >> > > > >> >> Thanks!
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >> Another clarification:
> > >> > > > > >> > > > >> >> The Common request/responses use slightly
> > different
> > >> > > > > >> > infrastructure
> > >> > > > > >> > > > >> >> objects: Node instead of Broker, TopicPartition
> > >> > instead
> > >> > > of
> > >> > > > > >> > > > >> >> TopicAndPartition and few more.
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >> I can write utilities to convert Node to Broker
> > to
> > >> > > > minimize
> > >> > > > > >> the
> > >> > > > > >> > > scope
> > >> > > > > >> > > > >> >> of the change.
> > >> > > > > >> > > > >> >> Or I can start replacing Brokers with Nodes
> > across
> > >> the
> > >> > > > > board.
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >> I'm currently taking the second approach - i.e,
> > if
> > >> > > > > >> > MetadataRequest
> > >> > > > > >> > > is
> > >> > > > > >> > > > >> >> now returning Node, I'm changing the entire
> line
> > of
> > >> > > > > >> dependencies
> > >> > > > > >> > to
> > >> > > > > >> > > > >> >> use Nodes instead of broker.
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >> Is this acceptable, or do we want to take a
> more
> > >> > minimal
> > >> > > > > >> approach
> > >> > > > > >> > > for
> > >> > > > > >> > > > >> >> this patch and do a larger replacement as a
> > follow
> > >> up?
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >> Gwen
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >> >> On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps <
> > >> > > > > >> jay.kr...@gmail.com>
> > >> > > > > >> > > > >> wrote:
> > >> > > > > >> > > > >> >> > Great.
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> > For (3) yeah I think we should just think
> > through
> > >> > the
> > >> > > > > >> > end-to-end
> > >> > > > > >> > > > >> pattern
> > >> > > > > >> > > > >> >> > for these versioned requests since it seems
> > like
> > >> we
> > >> > > will
> > >> > > > > >> have a
> > >> > > > > >> > > > >> number of
> > >> > > > > >> > > > >> >> > them. The serialization code used as you
> > >> described
> > >> > > gets
> > >> > > > us
> > >> > > > > >> to
> > >> > > > > >> > the
> > >> > > > > >> > > > >> right
> > >> > > > > >> > > > >> >> > Struct which the user would then wrap in
> > >> something
> > >> > > like
> > >> > > > > >> > > > >> ProduceRequest.
> > >> > > > > >> > > > >> >> > Presumably there would just be one
> > ProduceRequest
> > >> > that
> > >> > > > > would
> > >> > > > > >> > > > >> internally
> > >> > > > > >> > > > >> >> > fill in things like null or otherwise adapt
> the
> > >> > struct
> > >> > > > to
> > >> > > > > a
> > >> > > > > >> > > usable
> > >> > > > > >> > > > >> >> object.
> > >> > > > > >> > > > >> >> > On the response side we would have the
> version
> > >> from
> > >> > > the
> > >> > > > > >> request
> > >> > > > > >> > > to
> > >> > > > > >> > > > >> use
> > >> > > > > >> > > > >> >> for
> > >> > > > > >> > > > >> >> > correct versioning. On question is whether
> this
> > >> is
> > >> > > > enough
> > >> > > > > or
> > >> > > > > >> > > > whether
> > >> > > > > >> > > > >> we
> > >> > > > > >> > > > >> >> > need to have switches in KafkaApis to do
> things
> > >> like
> > >> > > > > >> > > > >> >> >    if(produceRequest.version == 3)
> > >> > > > > >> > > > >> >> >        // do something
> > >> > > > > >> > > > >> >> >    else
> > >> > > > > >> > > > >> >> >       // do something else
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> > Basically it would be good to be able to
> write
> > a
> > >> > quick
> > >> > > > > wiki
> > >> > > > > >> > that
> > >> > > > > >> > > > was
> > >> > > > > >> > > > >> like
> > >> > > > > >> > > > >> >> > "how to add or modify a kafka api" that
> > explained
> > >> > the
> > >> > > > > right
> > >> > > > > >> way
> > >> > > > > >> > > to
> > >> > > > > >> > > > >> do all
> > >> > > > > >> > > > >> >> > this.
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> > I don't think any of this necessarily blocks
> > this
> > >> > > ticket
> > >> > > > > >> since
> > >> > > > > >> > at
> > >> > > > > >> > > > the
> > >> > > > > >> > > > >> >> > moment we don't have tons of versions of
> > requests
> > >> > out
> > >> > > > > there.
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> > -Jay
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> > On Wed, Mar 18, 2015 at 2:50 PM, Gwen
> Shapira <
> > >> > > > > >> > > > gshap...@cloudera.com
> > >> > > > > >> > > > >> >
> > >> > > > > >> > > > >> >> wrote:
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> >> See inline responses:
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps <
> > >> > > > > >> > jay.kr...@gmail.com
> > >> > > > > >> > > >
> > >> > > > > >> > > > >> wrote:
> > >> > > > > >> > > > >> >> >> > Hey Gwen,
> > >> > > > > >> > > > >> >> >> >
> > >> > > > > >> > > > >> >> >> > This makes sense to me.
> > >> > > > > >> > > > >> >> >> >
> > >> > > > > >> > > > >> >> >> > A couple of thoughts, mostly confirming
> what
> > >> you
> > >> > > > said I
> > >> > > > > >> > think:
> > >> > > > > >> > > > >> >> >> >
> > >> > > > > >> > > > >> >> >> >    1. Ideally we would move completely
> over
> > to
> > >> > the
> > >> > > > new
> > >> > > > > >> style
> > >> > > > > >> > > of
> > >> > > > > >> > > > >> >> request
> > >> > > > > >> > > > >> >> >> >    definition for server-side processing,
> > even
> > >> > for
> > >> > > > the
> > >> > > > > >> > > internal
> > >> > > > > >> > > > >> >> >> requests. This
> > >> > > > > >> > > > >> >> >> >    way all requests would have the same
> > >> > header/body
> > >> > > > > >> struct
> > >> > > > > >> > > > stuff.
> > >> > > > > >> > > > >> As
> > >> > > > > >> > > > >> >> you
> > >> > > > > >> > > > >> >> >> say
> > >> > > > > >> > > > >> >> >> >    for the internal requests we can just
> > >> delete
> > >> > the
> > >> > > > > scala
> > >> > > > > >> > > code.
> > >> > > > > >> > > > >> For
> > >> > > > > >> > > > >> >> the
> > >> > > > > >> > > > >> >> >> old
> > >> > > > > >> > > > >> >> >> >    clients they will continue to use their
> > old
> > >> > > > request
> > >> > > > > >> > > > definitions
> > >> > > > > >> > > > >> >> until
> > >> > > > > >> > > > >> >> >> we
> > >> > > > > >> > > > >> >> >> >    eol them. I would propose that new
> > changes
> > >> > will
> > >> > > go
> > >> > > > > >> only
> > >> > > > > >> > > into
> > >> > > > > >> > > > >> the
> > >> > > > > >> > > > >> >> new
> > >> > > > > >> > > > >> >> >> >    request/response objects and the old
> > scala
> > >> > ones
> > >> > > > will
> > >> > > > > >> be
> > >> > > > > >> > > > >> permanently
> > >> > > > > >> > > > >> >> >> stuck
> > >> > > > > >> > > > >> >> >> >    on their current version until
> > >> discontinued.
> > >> > So
> > >> > > > > after
> > >> > > > > >> > this
> > >> > > > > >> > > > >> change
> > >> > > > > >> > > > >> >> >> that old
> > >> > > > > >> > > > >> >> >> >    scala code could be considered frozen.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> SimpleConsumer is obviously stuck with the
> old
> > >> > > > > >> > request/response.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> The Producers can be converted to the common
> > >> > > > > >> request/response
> > >> > > > > >> > > > >> without
> > >> > > > > >> > > > >> >> >> breaking compatibility.
> > >> > > > > >> > > > >> >> >> I think we should do this (even though it
> > >> requires
> > >> > > > > fiddling
> > >> > > > > >> > with
> > >> > > > > >> > > > >> >> >> additional network serialization code), just
> > so
> > >> we
> > >> > > can
> > >> > > > > >> throw
> > >> > > > > >> > the
> > >> > > > > >> > > > old
> > >> > > > > >> > > > >> >> >> ProduceRequest away.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> Does that make sense?
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> >    2. I think it would be reasonable to
> keep
> > >> all
> > >> > > the
> > >> > > > > >> > requests
> > >> > > > > >> > > > >> under
> > >> > > > > >> > > > >> >> >> common,
> > >> > > > > >> > > > >> >> >> >    even though as you point out there is
> > >> > currently
> > >> > > no
> > >> > > > > use
> > >> > > > > >> > for
> > >> > > > > >> > > > >> some of
> > >> > > > > >> > > > >> >> >> them
> > >> > > > > >> > > > >> >> >> >    beyond broker-to-broker communication
> at
> > >> the
> > >> > > > moment.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> Yep.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> >    3. We should think a little about how
> > >> > versioning
> > >> > > > > will
> > >> > > > > >> > work.
> > >> > > > > >> > > > >> Making
> > >> > > > > >> > > > >> >> >> this
> > >> > > > > >> > > > >> >> >> >    convenient on the server side is an
> > >> important
> > >> > > goal
> > >> > > > > for
> > >> > > > > >> > the
> > >> > > > > >> > > > new
> > >> > > > > >> > > > >> >> style
> > >> > > > > >> > > > >> >> >> of
> > >> > > > > >> > > > >> >> >> >    request definition. At the
> serialization
> > >> level
> > >> > > we
> > >> > > > > now
> > >> > > > > >> > > handle
> > >> > > > > >> > > > >> >> >> versioning but
> > >> > > > > >> > > > >> >> >> >    the question we should discuss and work
> > >> out is
> > >> > > how
> > >> > > > > >> this
> > >> > > > > >> > > will
> > >> > > > > >> > > > >> map to
> > >> > > > > >> > > > >> >> >> the
> > >> > > > > >> > > > >> >> >> >    request objects (which I assume will
> > remain
> > >> > > > > >> unversioned).
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> The way I see it working (I just started on
> > >> this,
> > >> > so
> > >> > > I
> > >> > > > > may
> > >> > > > > >> > have
> > >> > > > > >> > > > >> gaps):
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> * Request header contains the version
> > >> > > > > >> > > > >> >> >> * When we read the request, we use
> > >> > > > > ProtoUtils.requestSchema
> > >> > > > > >> > > which
> > >> > > > > >> > > > >> >> >> takes version as a parameter and is
> > responsible
> > >> to
> > >> > > give
> > >> > > > > us
> > >> > > > > >> the
> > >> > > > > >> > > > right
> > >> > > > > >> > > > >> >> >> Schema, which we use to read the buffer and
> > get
> > >> the
> > >> > > > > correct
> > >> > > > > >> > > > struct.
> > >> > > > > >> > > > >> >> >> * KafkaApis handlers have the header, so
> they
> > >> can
> > >> > use
> > >> > > > it
> > >> > > > > to
> > >> > > > > >> > > access
> > >> > > > > >> > > > >> the
> > >> > > > > >> > > > >> >> >> correct fields, build the correct response,
> > etc.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> Does that sound about right?
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> >    4. Ideally after this refactoring the
> > >> network
> > >> > > > > package
> > >> > > > > >> > > should
> > >> > > > > >> > > > >> not be
> > >> > > > > >> > > > >> >> >> >    dependent on the individual request
> > >> objects.
> > >> > The
> > >> > > > > >> > intention
> > >> > > > > >> > > is
> > >> > > > > >> > > > >> that
> > >> > > > > >> > > > >> >> >> stuff in
> > >> > > > > >> > > > >> >> >> >    kafka.network is meant to be generic
> > >> network
> > >> > > > > >> > infrastructure
> > >> > > > > >> > > > >> that
> > >> > > > > >> > > > >> >> >> doesn't
> > >> > > > > >> > > > >> >> >> >    know about the particular fetch/produce
> > >> apis
> > >> > we
> > >> > > > have
> > >> > > > > >> > > > >> implemented on
> > >> > > > > >> > > > >> >> >> top.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> I'll make a note to validate that this is
> the
> > >> case.
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >> >> >
> > >> > > > > >> > > > >> >> >> > -Jay
> > >> > > > > >> > > > >> >> >> >
> > >> > > > > >> > > > >> >> >> > On Wed, Mar 18, 2015 at 11:11 AM, Gwen
> > >> Shapira <
> > >> > > > > >> > > > >> gshap...@cloudera.com
> > >> > > > > >> > > > >> >> >
> > >> > > > > >> > > > >> >> >> > wrote:
> > >> > > > > >> > > > >> >> >> >
> > >> > > > > >> > > > >> >> >> >> Hi Jun,
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> I was taking a slightly different
> approach.
> > >> Let
> > >> > me
> > >> > > > > know
> > >> > > > > >> if
> > >> > > > > >> > it
> > >> > > > > >> > > > >> makes
> > >> > > > > >> > > > >> >> >> >> sense to you:
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> 1. Get the bytes from network (kinda
> > >> > > unavoidable...)
> > >> > > > > >> > > > >> >> >> >> 2. Modify RequestChannel.Request to
> contain
> > >> > header
> > >> > > > and
> > >> > > > > >> body
> > >> > > > > >> > > > >> (instead
> > >> > > > > >> > > > >> >> >> >> of a single object)
> > >> > > > > >> > > > >> >> >> >> 3. Create the head and body from bytes as
> > >> > follow:
> > >> > > > > >> > > > >> >> >> >>     val header: RequestHeader =
> > >> > > > > >> RequestHeader.parse(buffer)
> > >> > > > > >> > > > >> >> >> >>     val apiKey: Int = header.apiKey
> > >> > > > > >> > > > >> >> >> >>     val body: Struct =
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >>
> > >> > > > > >> > > >
> > >> > > > > >> >
> > >> > > > > >>
> > >> > > > >
> > >> > >
> > >>
> > ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
> > >> > > > > >> > > > >> >> >> >> 4. KafkaAPIs will continue getting
> > >> > > > > >> RequestChannel.Request,
> > >> > > > > >> > > but
> > >> > > > > >> > > > >> will
> > >> > > > > >> > > > >> >> >> >> now have access to body and header
> > >> separately.
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> I agree that I need a Request/Response
> > >> objects
> > >> > > that
> > >> > > > > >> contain
> > >> > > > > >> > > > only
> > >> > > > > >> > > > >> the
> > >> > > > > >> > > > >> >> >> >> body for all requests objects.
> > >> > > > > >> > > > >> >> >> >> I'm thinking of implementing them in
> > >> > > > > >> o.a.k.Common.Requests
> > >> > > > > >> > in
> > >> > > > > >> > > > >> Java
> > >> > > > > >> > > > >> >> for
> > >> > > > > >> > > > >> >> >> >> consistency.
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> When we are discussing the
> > requests/responses
> > >> > used
> > >> > > > in
> > >> > > > > >> > > > >> SimpleConsumer,
> > >> > > > > >> > > > >> >> >> >> we mean everything used in javaapi,
> right?
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> Gwen
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao
> <
> > >> > > > > >> j...@confluent.io
> > >> > > > > >> > >
> > >> > > > > >> > > > >> wrote:
> > >> > > > > >> > > > >> >> >> >> > Hi, Gwen,
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > I was thinking that we will be doing
> the
> > >> > > following
> > >> > > > > in
> > >> > > > > >> > > > >> KAFKA-1927.
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > 1. Get the bytes from network.
> > >> > > > > >> > > > >> >> >> >> > 2. Use a new generic approach to
> convert
> > >> bytes
> > >> > > > into
> > >> > > > > >> > request
> > >> > > > > >> > > > >> >> objects.
> > >> > > > > >> > > > >> >> >> >> > 2.1 Read the fixed request header
> (using
> > >> the
> > >> > > util
> > >> > > > in
> > >> > > > > >> > > client).
> > >> > > > > >> > > > >> >> >> >> > 2.2 Based on the request id in the
> > header,
> > >> > > > > deserialize
> > >> > > > > >> > the
> > >> > > > > >> > > > >> rest of
> > >> > > > > >> > > > >> >> the
> > >> > > > > >> > > > >> >> >> >> > bytes into a request specific object
> > (using
> > >> > the
> > >> > > > new
> > >> > > > > >> java
> > >> > > > > >> > > > >> objects).
> > >> > > > > >> > > > >> >> >> >> > 3. We will then be passing a header and
> > an
> > >> > > > > >> > > > >> AbstractRequestResponse
> > >> > > > > >> > > > >> >> to
> > >> > > > > >> > > > >> >> >> >> > KafkaApis.
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > In order to do that, we will need to
> > create
> > >> > > > similar
> > >> > > > > >> > > > >> >> request/response
> > >> > > > > >> > > > >> >> >> >> > objects for internal requests such as
> > >> > > StopReplica,
> > >> > > > > >> > > > >> LeaderAndIsr,
> > >> > > > > >> > > > >> >> >> >> > UpdateMetadata, ControlledShutdown. Not
> > >> sure
> > >> > > > whether
> > >> > > > > >> they
> > >> > > > > >> > > > >> should be
> > >> > > > > >> > > > >> >> >> >> written
> > >> > > > > >> > > > >> >> >> >> > in java or scala, but perhaps they
> should
> > >> be
> > >> > > only
> > >> > > > in
> > >> > > > > >> the
> > >> > > > > >> > > core
> > >> > > > > >> > > > >> >> project.
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > Also note, there are some scala
> > >> > > requests/responses
> > >> > > > > >> used
> > >> > > > > >> > > > >> directly in
> > >> > > > > >> > > > >> >> >> >> > SimpleConsumer. Since that's our public
> > >> api,
> > >> > we
> > >> > > > > can't
> > >> > > > > >> > > remove
> > >> > > > > >> > > > >> those
> > >> > > > > >> > > > >> >> >> scala
> > >> > > > > >> > > > >> >> >> >> > objects until the old consumer is
> phased
> > >> out.
> > >> > We
> > >> > > > can
> > >> > > > > >> > remove
> > >> > > > > >> > > > the
> > >> > > > > >> > > > >> >> rest
> > >> > > > > >> > > > >> >> >> of
> > >> > > > > >> > > > >> >> >> >> the
> > >> > > > > >> > > > >> >> >> >> > scala request objects.
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > Thanks,
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > Jun
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> > On Tue, Mar 17, 2015 at 6:08 PM, Gwen
> > >> Shapira
> > >> > <
> > >> > > > > >> > > > >> >> gshap...@cloudera.com>
> > >> > > > > >> > > > >> >> >> >> wrote:
> > >> > > > > >> > > > >> >> >> >> >
> > >> > > > > >> > > > >> >> >> >> >> Hi,
> > >> > > > > >> > > > >> >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> >> I'm starting this thread for the
> various
> > >> > > > questions
> > >> > > > > I
> > >> > > > > >> run
> > >> > > > > >> > > > into
> > >> > > > > >> > > > >> >> while
> > >> > > > > >> > > > >> >> >> >> >> refactoring the server to use client
> > >> requests
> > >> > > and
> > >> > > > > >> > > responses.
> > >> > > > > >> > > > >> >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> >> Help is appreciated :)
> > >> > > > > >> > > > >> >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> >> First question: LEADER_AND_ISR request
> > and
> > >> > > > > >> STOP_REPLICA
> > >> > > > > >> > > > >> request
> > >> > > > > >> > > > >> >> are
> > >> > > > > >> > > > >> >> >> >> >> unimplemented in the client.
> > >> > > > > >> > > > >> >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> >> Do we want to implement them as part
> of
> > >> this
> > >> > > > > >> > refactoring?
> > >> > > > > >> > > > >> >> >> >> >> Or should we continue using the scala
> > >> > > > > implementation
> > >> > > > > >> for
> > >> > > > > >> > > > >> those?
> > >> > > > > >> > > > >> >> >> >> >>
> > >> > > > > >> > > > >> >> >> >> >> Gwen
> > >> > > > > >> > > > >> >> >> >> >>
> > >> > > > > >> > > > >> >> >> >>
> > >> > > > > >> > > > >> >> >>
> > >> > > > > >> > > > >> >>
> > >> > > > > >> > > > >>
> > >> > > > > >> > > > >
> > >> > > > > >> > > > >
> > >> > > > > >> > > >
> > >> > > > > >> > >
> > >> > > > > >> >
> > >> > > > > >>
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to