Jay Kreps created KAFKA-643:
-------------------------------
Summary: Refactor api definition layer
Key: KAFKA-643
URL: https://issues.apache.org/jira/browse/KAFKA-643
Project: Kafka
Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps
The way we are defining our protocol is really a bit embarrassing. It is full
of ad hoc serialization code for each API. This code is very fiddly and opaque
and when it has errors they are hard to debug. Since it is all done one-off it
is also very easy for it to become inconsistent. This was tolerable when there
were only two apis with a few fields each, but now there are a half dozen more
complex apis. By my count there is now over 1000 lines of code in kafka.apis.*.
One option would be to use protocol buffers or thrift or another
schema-oriented code gen RPC language. However I think this is probably the
wrong direction for a couple reasons. One is that we want something that works
well with our I/O model, both network and disk, which is very NIO-centric. So
it should work directly with ByteBuffers. Second I feel that these systems
complicate the specification of the protocol. They give a schema, which is a
great high-level description, but the translation of that to bytes is
essentially whatever their code-gen engine chooses to do. These things are a
great way to build application services, but not great for something like what
we are building.
Instead I think we should do what we have done, specify the protocol as a wiki.
However we should write a little helper code to make our lives easier.
Here is my recommendation for how this code would work. We add two helper
classes: Schema and Record.
You define messages formats like this:
import Types._
val FetchRequestProtocol =
Schema("ReplicaId"->int32,
"MaxWaitTime"->int32,
"MinBytes"->int32,
Seq("TopicName"->utf8,
Seq("Partition"->int32,
"FetchOffset"->int64,
"MaxBytes"->int32)))
Note that this almost exactly matches the BNF for the fetch request:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
Once defined this schema can be used to parse messages:
val record: Record = FetchRequestProtocol.readFrom(buffer)
A record is just a wrapper around an array. The readFrom method parses out the
fields specified in the schema and populates the array. Fields in the record
can be accessed by name, e.g.
record("ReplicaId")
For common access this is probably good enough. However since the position is
fixed, it is also possible to get the element by a Field object, which gets rid
of the hashmap lookup and goes directly to the right slot. E.g.
val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a global
variable
...
record(ReplicaIdField)
This will be for cases where we are a bit performance conscious and don't want
to do umpteen hashmap lookups to resolve string field names.
Likewise the other direction, to write out a record:
record.writeTo(buffer)
and to get the size in bytes:
record.size
Implementing a single read, write, and size method with generic schemas will
not only make the underlying protocol clearly defined but also ensure good
error handling, error reporting, etc. It will be a bit slower, maybe not much
because we can optimize this code.
I do realize that this is essentially what Avro or Thrift or ProtocolBuffers
do, but I think this is much simpler, and can be implemented in a few hundred
lines of code with no dependencies. Furthermore it is a way to implement our
protocol, not a way to define a protocol.
In terms of how we use this, this is what I have in mind:
I think we should split the apis into a generic and a specific portion. With
the generic piece being the header shared by all requests and responses, and
the specific portion being the bits for that message. I recommend we officially
implement versioning by allowing multiple versions of the schemas and always
looking up the right schema for the incoming and outgoing messages. I think we
can keep the existing case classes, and just map the scala objects to and from
the record instances in a wrapper layer prior to the existing KafkaApis. The
KafkaApis.handle method would disappear and instead this wrapper would handle
message deserialization and calling the right method with the right request
object.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira