[ 
https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13548797#comment-13548797
 ] 

David Arthur commented on KAFKA-643:
------------------------------------

+1 for splitting generic/specific parts of the API (this is basically what I do 
in my Python client).

+1 for specifying the protocol in a ~BNF form. This would require protocols to 
be specified as LL grammars (which they all are), which is required for 
efficient ByteBuffer packing/unpacking anyways. 

However, how would this scheme handle recursive definitions (like MessageSet)? 
I've always felt the depth of this should be limited to one, meaning a single 
Message can contain a compressed MessageSet which can only be composed of 
regular (uncompressed) Messages. In 
https://github.com/mumrah/kafka-python/blob/master/kafka/client.py#L355, I have 
to endlessly recurse to ensure I've fully consumed the messages - kind of a 
pain. If the depth was limited, I could decode it non-recursively. 

+0 for not using Avro et al. I understand the performance implications of using 
one of these frameworks, but it sure does make client development easier. 
However, as long as the protocol spec is clear (and correct) implementing a 
client is not so bad.

What about the Java API? As far as I can tell, the purpose of these classes is 
to delegate to the real APIs and handle Java -> Scala data type conversion. It 
seems like this should be able to be automatic/automagic. Although, I guess for 
the implicits stuff to work the Java classes must be present.

I know it's very new (Scala 10) and experimental, but macros might help in 
simplifying the APIs: http://docs.scala-lang.org/overviews/macros/overview.html.
                
> Refactor api definition layer
> -----------------------------
>
>                 Key: KAFKA-643
>                 URL: https://issues.apache.org/jira/browse/KAFKA-643
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>
> The way we are defining our protocol is really a bit embarrassing. It is full 
> of ad hoc serialization code for each API. This code is very fiddly and 
> opaque and when it has errors they are hard to debug. Since it is all done 
> one-off it is also very easy for it to become inconsistent. This was 
> tolerable when there were only two apis with a few fields each, but now there 
> are a half dozen more complex apis. By my count there is now over 1000 lines 
> of code in kafka.apis.*.
> One option would be to use protocol buffers or thrift or another 
> schema-oriented code gen RPC language. However I think this is probably the 
> wrong direction for a couple reasons. One is that we want something that 
> works well with our I/O model, both network and disk, which is very 
> NIO-centric. So it should work directly with ByteBuffers. Second I feel that 
> these systems complicate the specification of the protocol. They give a 
> schema, which is a great high-level description, but the translation of that 
> to bytes is essentially whatever their code-gen engine chooses to do. These 
> things are a great way to build application services, but not great for 
> something like what we are building.
> Instead I think we should do what we have done, specify the protocol as a 
> wiki. However we should write a little helper code to make our lives easier.
> Here is my recommendation for how this code would work. We add two helper 
> classes: Schema and Record.
> You define messages formats like this:
> import Types._
> val FetchRequestProtocol = 
>   Schema("ReplicaId"->int32, 
>                "MaxWaitTime"->int32, 
>                "MinBytes"->int32,
>                Seq("TopicName"->utf8,
>                       Seq("Partition"->int32, 
>                              "FetchOffset"->int64, 
>                              "MaxBytes"->int32)))
> Note that this almost exactly matches the BNF for the fetch request: 
>   
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> Once defined this schema can be used to parse messages:
>   val record: Record = FetchRequestProtocol.readFrom(buffer)
> A record is just a wrapper around an array. The readFrom method parses out 
> the fields specified in the schema and populates the array. Fields in the 
> record can be accessed by name, e.g. 
>   record("ReplicaId")
> For common access this is probably good enough. However since the position is 
> fixed, it is also possible to get the element by a Field object, which gets 
> rid of the hashmap lookup and goes directly to the right slot. E.g.
>   val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a 
> global variable
>   ...
>   record(ReplicaIdField)
> This will be for cases where we are a bit performance conscious and don't 
> want to do umpteen hashmap lookups to resolve string field names.
> Likewise the other direction, to write out a record:
>   record.writeTo(buffer)
> and to get the size in bytes:
>   record.size
> Implementing a single read, write, and size method with generic schemas will 
> not only make the underlying protocol clearly defined but also ensure good 
> error handling, error reporting, etc. It will be a bit slower, maybe not much 
> because we can optimize this code.
> I do realize that this is essentially what Avro or Thrift or ProtocolBuffers 
> do, but I think this is much simpler, and can be implemented in a few hundred 
> lines of code with no dependencies. Furthermore it is a way to implement our 
> protocol, not a way to define a protocol.
> In terms of how we use this, this is what I have in mind:
> I think we should split the apis into a generic and a specific portion. With 
> the generic piece being the header shared by all requests and responses, and 
> the specific portion being the bits for that message. I recommend we 
> officially implement versioning by allowing multiple versions of the schemas 
> and always looking up the right schema for the incoming and outgoing 
> messages. I think we can keep the existing case classes, and just map the 
> scala objects to and from the record instances in a wrapper layer prior to 
> the existing KafkaApis. The KafkaApis.handle method would disappear and 
> instead this wrapper would handle message deserialization and calling the 
> right method with the right request object.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to