-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23516/
-----------------------------------------------------------

(Updated July 15, 2014, 6:36 p.m.)


Review request for kafka.


Bugs: KAFKA-1462
    https://issues.apache.org/jira/browse/KAFKA-1462


Repository: kafka


Description (updated)
-------

1. I kept the request objects in the server separated from those in the client. 
This is because (1) some of the existing request objects are part of old client 
api (FetechRequest, OffsetCommitRequest, etc) and we can't remove them until 
the old clients are removed, (2) changing existing request objects on the 
server side requires significant refactoring.

2. On the client side, I refactored existing request objects a bit. Now, every 
request/response object extends from a GenericStruct. GenericStruct provides a 
standard way for doing serialization and toString so that we don't have to do 
that on every request. Each request/response can be constructed in two ways: 
(1) by providing request specific fields; (2) by providing a struct. The latter 
is used for getting a request/response from its serialized format.

3. On the server side. What I did is to keep the existing requests more or less 
untouched. For new types of requests, create a thin wrapper on the server side 
so that it can leverage the request objects created on the client side. This 
way the server side object will share the serialization and the toString logic 
with the client side object. In order to do this, I removed correlationId from 
the RequestOrResponse object. There is only one place where correlationId is 
directly referenced and it is not necessary.

4. Multi-version support. We now need to support two versions of 
OffsetCommitRequest since for the new consumer work, we added two extra fields 
in the request. For simplicity, both versions are converted to the same request 
object. Since the old version doesn't have the new fields, defaults will be 
used.

5. The new requests/responses are based on the format described in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats.
 I made some minor changes to the wiki so that the new requests follow the 
current standard.

6. Added some missing util functions and added unit test for testing the 
serialization/deserialization logic.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
a016269512b6d6d6e0fd3fab997e9c8265024eb4 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
c62707ab3aba26771fc4b993df28bf8c44f32309 
  clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
6fe7573973832615976defa37fe0dfbb8f911939 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
044b03061802ee5e8ea4f1995fb0988e1a70e9a7 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
8cecba50bf067713184208552af36469962cd628 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
f35bd87cf0c52a30ed779b25d60bbe64a60b9502 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
2652c32f123b3bc4b0456d4bc9fbba52c051724c 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
6036f6af1c55c1b0a15471e79b229b17f50ce31c 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
6cf4fb714916f1a318d788cde8fc0aad9dfb83ca 
  clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 
66cc2fea6443968e525419a203dbc4227e0b1cdf 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 
257b8287757e40349ea041ed7a651993007a55a8 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
2f98192b064d1ce7c0779e901293edb8c3801915 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
PRE-CREATION 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
dfad6e6534dd9b00099d110804899080e8d832ab 
  core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
c72ca14708a3625cb89d5fb92630138d2afa2bf0 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
7dacb2023788064b736df8b775aaf12281d545b5 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
46ec3db28f88bbf9e0b0de2133807dc552bcae13 
  core/src/main/scala/kafka/api/FetchRequest.scala 
a8b73acd1a813284744359e8434cb52d22063c99 
  core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala 
PRE-CREATION 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
3e408174dcc7e8dd9097bae41277ee4f7160afb3 
  core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala 
f63644448bb5a1d560f79427284ccbac9d46b789 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
630768ab57afb579049bcbc5d44ee6823b0e7cc2 
  core/src/main/scala/kafka/api/OffsetCommitResponse.scala 
4946e9729ecbf3da35bdab5c832d26977c107e9e 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
a32f8588ff02f5fb3c99fb8e5508f462923e8edc 
  core/src/main/scala/kafka/api/OffsetFetchResponse.scala 
c1222f422ddb6413bbb2e5da2980903ee70b9156 
  core/src/main/scala/kafka/api/OffsetRequest.scala 
7cbc26c6e38420aa57046a76087fe6d15df72477 
  core/src/main/scala/kafka/api/OffsetResponse.scala 
0e1d6e362a1cec8250cf3930d3046058be4ae192 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
0c295a2fe6712a77cd24719cb42015e2f787b08d 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/api/RequestKeys.scala 
fbfc9d3aeaffed4ca85902125fcc1050086835db 
  core/src/main/scala/kafka/api/RequestOrResponse.scala 
57f87a48c5e87220e7f377b23d2bbfa0d16350dc 
  core/src/main/scala/kafka/api/StopReplicaRequest.scala 
68fc1389ee71122adb716d9d821dd8987a78ecee 
  core/src/main/scala/kafka/api/StopReplicaResponse.scala 
c90ddee3d820472236ab554cddd2e0db24233ae3 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
a319f2f438bfd84c63edb330685b2b41d4b08aa0 
  core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
f6b7429faeab34d0938cb2f78ce91021be7c4b85 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
543e262b25a946abd84dd58dc5fcee67c6252375 
  core/src/main/scala/kafka/api/UpdateMetadataResponse.scala 
c583c1f00c89a993fb9dc280f190c32ea895dca5 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
8763968fbff697e4c5c98ab1274627c192a4d26a 
  core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 
08dcc5553ccac7fbec0ea2662b402e2cec079e48 
  core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
7e6da164a26b1893c26c624a9998d4fedf8af95e 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
a2117b34c2ee3554602fe068eed0c90b075958c1 

Diff: https://reviews.apache.org/r/23516/diff/


Testing
-------


Thanks,

Jun Rao

Reply via email to