-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Sept. 2, 2014, 1:09 a.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incorporated Jun's comments round two.

Incorporated Jun's comments.

1. I left some cases in Log since they are return values for some of their APIs.
2. I kept the fetch info in the delayed fetch metadata since it needs to be 
used for re-reading the log.
3. I kept the name of "callbackOnComplete" by following the principle that only 
the caller knows what the callback is used for, and hence they can name the 
callback as reponseCallback (from KafkaApi) and putCacheCallback (from 
OffsetManager), all the callee will take the callback as "callbackOnComplete".

Unit test passed, with some other notes:

1. Found and fix a bug in the current delayed fetch satisifaction logic: 
previously when we calculate the bytes, we do not take in the fetchMaxBytes 
into consideration as an upper limit for a single partition's log, but simply 
get the diff between the current HW/LEO and the fetch offset.
2. Found and fix a bug in the unit tests: we used to create replica manager on 
the fly but did not shut it down upon completing the test, which will leak the 
background thread (i.e. reaper thread of purgatory).
3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two 
implemented functions: forceComplete() and isCompleted(), and two functions 
that need to be instantiated in the subclasses: tryComplete() and complete(). 
Please let me know if people have more comments on the current API.
4. Cleaned the SimpleFetch test, previously this test is too complicate but it 
actually just test a simple logic of the replica manager.


Diffs
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang

Reply via email to