> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116>
> >
> >     We need to make tickMs and wheelSize configurable.
> 
> Yasuhiro Matsuda wrote:
>     What is the motivation? I don't think it is a good idea to allow users to 
> configure them.

I am not concerning about user-configurability. The purgatory is used by 
multiple request types: produce, fetch and in the future rebalance, heartbeat 
and join group, different request type may need to set the tickMs and wheelSize 
differently.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187>
> >
> >     TBD

Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return 
Boolean instead of Unit indicating if the task has not expired and successfully 
added to the timer. And then we can change above as

if (!operation.isComplete()) {
  if (!timeoutTimer.add(operation) {
    operation.cancel()
  }
}


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288
> > <https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line288>
> >
> >     It may be useful to return #.purged items?
> 
> Yasuhiro Matsuda wrote:
>     What is the use?

At line 316 / 317 we could log on trace level whether the clock advance expired 
any tasks and the #.purged items.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > <https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68>
> >
> >     I think bucket.flush(reinsurt) will always fail on all the items since 
> > their expiration time will always < bucket expiration + ticketMs, i.e. the 
> > returned bucket from the delayed queue has already expired all its items. 
> > In this case, could we just call foreach(submit) on all of them instead of 
> > trying to reinsurt them?
> 
> Yasuhiro Matsuda wrote:
>     It is true only for the lowest wheel. Reinsert is necessary to make 
> timing wheels work. A bucket from a higher wheel may contain tasks not 
> expired (a tick time is longer in a higher wheel).

OK, I may miss sth. here, but this is my reasoning:

The bucket is only returned from delayed queue in line 62 if its expiration 
time has passed currentTime, after that at least the lowest wheel will advance 
to its expiration time, and hence add call within the reinsert is doomed to 
fail as task.expirationTime < wheel's time + tickMs.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > <https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72>
> >
> >     It seems the task entry of the task will only be set once throughout 
> > its life time; even when the task entry gets reinsurted its correspondence 
> > to the task will not change, right?
> >     
> >     If that is true we can just set the entry for the task in the 
> > constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
>     This sets TimerTaskEntry to TimerTask. TimeTask is created independently 
> from a Timer, then enqueued to a Timer.

Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 
46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its 
entry field will be set automatically.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
-----------------------------------------------------------


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> -----------------------------------------------------------
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
>     https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> new purgatory implementation
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>

Reply via email to