----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78444 -----------------------------------------------------------
Thanks for the patch! A few comments below. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment127361> timeoutTimer is an implementation detail in ExpiredOperationReaper. Could we hide it inside ExpiredOperationReaper and expose needed apis? core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment127363> Do we need to do that or could we just let the expirationReaper handle it? core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment127368> Our convention is to only use () for methods that have side effects. So, next will use (), but hasNext will not. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment127369> Is the null check necessary? If hasNext is true, the next item will always be available, right? core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment127373> Reworded the description a bit. Is that clearer? Trigger a purge if the number of completed but still being watched operations is larger than the purge threshold. That number is computed by the difference btw the estimated total number of operations and the number of pending delayed operations. core/src/main/scala/kafka/server/DelayedOperation.scala <https://reviews.apache.org/r/31568/#comment127374> are be completed => are completed core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment127429> Do you think the defaults for tickMs and wheelSize are good enough that we don't need to expose them as configs? core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment127167> Could we use inLock() where applicable? core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment127225> Do we need to run this in a separate thread? Could we just run this in the expireReaper thread as before? core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment127423> Do we need to pass in the timeoutMs or could be just hardcode the 200ms in poll()? The timemout is just so that we can shutdown the expiration reaper thread since we don't interrupt it. core/src/main/scala/kafka/utils/timer/Timer.scala <https://reviews.apache.org/r/31568/#comment127228> Would it be simpler to just handle one poll item and return? The outer loop will call this methold again on the next item. Also, poll() can block. So the expiration reaper thread may not be able to shut down if the queue is empty. core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment127198> Could we explain how TimingWheel works in the comments? core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment127205> tickSizeMs => tickMs core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment127210> Put in its own bucket? core/src/main/scala/kafka/utils/timer/TimingWheel.scala <https://reviews.apache.org/r/31568/#comment127224> It seems that all wheels at different hierachies always have the same startMs. Does that mean that the first bucket in the coarser level wheel is never used? core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala <https://reviews.apache.org/r/31568/#comment127380> The message string is incorrect. We expect a total of 2 delayed operations. Also, the value after "instead of" is missing. Ditto to other assertEquals in this test. core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala <https://reviews.apache.org/r/31568/#comment127383> Do we need both checks? It seems that one of them is enough. core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala <https://reviews.apache.org/r/31568/#comment127415> Not quite sure what this is testing. It's not clear to me why the sharedCounter won't increase after add. Perhaps, we can add some comments. core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala <https://reviews.apache.org/r/31568/#comment127417> Should we define expirationMs as override to make it clear? core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala <https://reviews.apache.org/r/31568/#comment127421> map() probably should be changed to foreach(). - Jun Rao On March 20, 2015, 3:45 p.m., Yasuhiro Matsuda wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31568/ > ----------------------------------------------------------- > > (Updated March 20, 2015, 3:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1989 > https://issues.apache.org/jira/browse/KAFKA-1989 > > > Repository: kafka > > > Description > ------- > > new purgatory implementation > > > Diffs > ----- > > core/src/main/scala/kafka/server/DelayedOperation.scala > e317676b4dd5bb5ad9770930e694cd7282d5b6d5 > core/src/main/scala/kafka/server/ReplicaManager.scala > c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 > core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION > core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > 7a37617395b9e4226853913b8989f42e7301de7c > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala > PRE-CREATION > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION > > Diff: https://reviews.apache.org/r/31568/diff/ > > > Testing > ------- > > > Thanks, > > Yasuhiro Matsuda > >