> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 338
> > <https://reviews.apache.org/r/25995/diff/13/?file=793041#file793041line338>
> >
> >     Can we make this a "reliable" commit - i.e., with retries up to the 
> > configured retry count? The policy is retry on commit errors during 
> > rebalance or shutdown, no need to retry on commit errors during 
> > auto-commits.
> >     
> >     So for e.g., if a mirror maker rebalances and there is simultaneously 
> > offset manager movement we would need to retry the commit.
> >     
> >     This is the motivation for the isAutoCommit flag - however, there seems 
> > to be a bug right now which maybe you can fix. i.e., if this is not an 
> > auto-commit then set retries to configured retries else no retries.
> 
> Jiangjie Qin wrote:
>     Changed the code based you your suggestion. My original thinking is that 
> in mirror maker one commit failure actually does not matter too much because 
> next commit will succeed if the failure is due to offset topic leader 
> migration, etc. But for a more general purpose, it probably should retry if 
> it is not an auto commit.

I was thinking more about the shutdown and rebalance cases. We ideally want the 
commits to be reliable for those cases.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 192
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line192>
> >
> >     Why do you need a dummy param?
> 
> Jiangjie Qin wrote:
>     Because the Utils.createObjct needs a args parameter and if we pass in a 
> null it will give an NPE...
>     I've changed the code in Utils to allow us to pass in a null which use 
> the no arg constructor.

See comment in latest RB.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 489
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line489>
> >
> >     Why not use KafkaScheduler for the offset commit task?
> 
> Jiangjie Qin wrote:
>     Haven't thought that before... But it looks that we need to do some more 
> handling when something wrong happen in the offset commit threads. The 
> KafkaScheduler code seems not do so.

So you can make the task itself catch throwables. So it would look something 
like this:

    scheduler.schedule("mirrorMakerOffsetsCommiter", commitTask, ...)

And in commitTask:
    try {
      commitOffsets()
    }
    catch {
      case t: Throwable =>
        // handle
    }

That said, I don't think connector.commitOffsets will throw anything - since we 
catch all throwables there.

The only additional detail is that after you shutdown the scheduler you will 
need to call commitOffsets() manually one last time.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 614
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line614>
> >
> >     There might be a small memory leak here: if there was an error though 
> > (in the branch above) it seems no one removes the offset from the list.
> 
> Jiangjie Qin wrote:
>     Yes, that is a memory leak, but in this case we should not commit the 
> offset of the message that was not sent successfully either. If any exception 
> occurs then the offsets will not advance anymore. We probably should have an 
> alert on the mirror consumer lags.

Hmm.. not sure if I'm on the same page here. See comments on new RB.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 705
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line705>
> >
> >     You need to also set tail.next = null (or None)
> 
> Jiangjie Qin wrote:
>     tail.next = null will be handled in the previous if...else... the old 
> tail.prev.next will become new tail.next, which will be null.

Makes sense.


> On Dec. 17, 2014, 1:17 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 666
> > <https://reviews.apache.org/r/25995/diff/13/?file=793043#file793043line666>
> >
> >     Should we expose a size() method - i.e., increment on add and decrement 
> > on remove. We can aggregate the size of all the offset lists outside and 
> > emit a gauge. That will give us some assurance that there are no 
> > "forgotten" offsets. Re: the potential leak mentioned above.
> >     
> >     In fact, I'm a bit nervous about correctness since this is a custom 
> > implementation of a semi-non-trivial data structure. We should probably 
> > even assert that it is empty when numMessageUnacked goes to zero as part of 
> > the rebalance.
> >     
> >     Ideally, these custom implementations need a full-fledged unit test.
> 
> Jiangjie Qin wrote:
>     That's a good point, we probably need a metric to see if we have some 
> stuck offsets. But those stuck offsets should also not be committed anyways. 
> We need to be alerted on that situation once it happens. Maybe add an 
> assertion on the exception block where stuck offset occurs will be better?

Right - I think we can add a metric and/or add the assertion. The only issue 
with asserting is that there is the small duplicate issue that you pointed out 
(i.e., slight chance that one message is held by producer thread and not yet 
handed to the producer).

I don't think we should block the mirror maker though as mentioned in the new 
RB. That would be handled by setting infinite retries. In which case you 
shouldn't really hit the exception condition in the producer callback.


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review65306
-----------------------------------------------------------


On Dec. 17, 2014, 8:29 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 17, 2014, 8:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Incorporated Jun's comments
> 
> 
> Incorporated Jun's comments and rebased on trunk
> 
> 
> Rebased on current trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> e991d2187d03241f639eeaf6769fb59c8c99664c 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9baad34a9793e5067d11289ece2154ba87b388af 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 77d951d13b8d8ad00af40257fe51623cc2caa61a 
>   core/src/main/scala/kafka/utils/Utils.scala 
> 738c1af9ef5de16fdf5130daab69757a14c48b5c 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to