-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


We can probably remove DelayedItem if it is not referenced by anyone any more.


core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment123994>

    Add "override" keyword to indicate it is extended from the Runnable. Also, 
the comments on top seems be referring to a variable, not a function, which is 
a bit misleading.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124005>

    We need to make tickMs and wheelSize configurable.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124007>

    TBD



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124136>

    It seems we do not need to keep this as a class member variable, but just 
compute the value in purge() on-the-fly every time.



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124140>

    Does it require to sync on refQueue as well?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124134>

    It may be useful to return #.purged items?



core/src/main/scala/kafka/server/DelayedOperation.scala
<https://reviews.apache.org/r/31568/#comment124135>

    The threshold should be configurable.



core/src/main/scala/kafka/utils/timer/Timer.scala
<https://reviews.apache.org/r/31568/#comment124361>

    I think bucket.flush(reinsurt) will always fail on all the items since 
their expiration time will always < bucket expiration + ticketMs, i.e. the 
returned bucket from the delayed queue has already expired all its items. In 
this case, could we just call foreach(submit) on all of them instead of trying 
to reinsurt them?



core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
<https://reviews.apache.org/r/31568/#comment124401>

    It seems the task entry of the task will only be set once throughout its 
life time; even when the task entry gets reinsurted its correspondence to the 
task will not change, right?
    
    If that is true we can just set the entry for the task in the constructor 
of the task entry.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
<https://reviews.apache.org/r/31568/#comment124359>

    How about change this comment to:
    
    We only need to enqueue the bucket when its expieration time has changed, 
i.e. the wheel has advanced one cycle and the previous buckets gets reused; 
further calls to set the expiration within the same wheel cycle will pass in 
the same value and hence return false, thus the bucket with the same expiration 
will not be enqueued multiple times.



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
<https://reviews.apache.org/r/31568/#comment124462>

    Could we just add an atomic integer recording the list size and size() 
function to TimerTaskList?



core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
<https://reviews.apache.org/r/31568/#comment124459>

    latch.await(0, TimeUnit.SECONDS)?


Since this a rather complicated patch (even after reading the wiki page I took 
quite some time to get through the code), I would suggest adding more comments 
on the functions / member variables of each classes.

- Guozhang Wang


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>

Reply via email to