> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529>
> >
> >     Does that imply there is always an offset? Is that always true?
> >     
> >     I don't quite follow how the logic works. Since the offset for each 
> > target partition is updated independantly, I am not sure why you can rely 
> > on checking that those offsets are consecutive. Also, does this logic work 
> > when there is partitioning key?
> >     
> >     It would be useful to add some comments to describe why a two-level 
> > offset map is needed.
> 
> Jiangjie Qin wrote:
>     The iterator only iterates over the key that exits in the map, i.e. 
> offset exists.
>     
>     It does seem confusing without explanation... Will it be clearer if the 
> following comments are added?
>     
>     Following is the offset commit logic:
>     We know that:
>     1. Messages from same source partition end up in the same data channel 
> queue in order and will be sent by the same producer.
>     2. Messages sent by the same producer could go to different target 
> partitions even if the messages are from the same source partition. 
>     3. The order is only guaranteed for messages sent to the same target 
> partition. That means a (SourceTopicPartition, TargetPartition) combination 
> is needed.
>     4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
> single offset is sufficient, because if an offset is acked, all the offset 
> smaller than that offset going to the same target partition must have been 
> sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
> producers, after sorting all the last acked offsets of target partitions 
> which are corresponding to the same source partition, we can commit the 
> offsets from the smallest until the acked offset is no longer consecutive. 
> (But we do need to set send retries to be infinite in producer config, 
> otherwise this won't work. I'll add it to the comments.)
>     
>     Based on above logic, we could use Map<(SourceTopicParitition, 
> TargetPartition), offset> to track the offset. But because the offset commit 
> is based on source topic partitions, it is easier to have a 
> Map<SourceTopicPartition, Map<TargetPatition, offset>> to find the offset to 
> commit. That's why there is a 2-level map.
>     
>     The logic above does not rely on any message key, so it works in general 
> for both keyed/non-keyed messages.

The part that's not clear to me is "until the acked offset is no longer 
consecutive". Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
routed to partition 1 and offset 3 can be routed to partition 2. When all 
messages are sent, you will see partition 1 with offset 5 and partition 2 with 
offset 3. Since offset 3 and 5 are not consecutive, are you just going to 
commit offset 3? If so, you may be stuck in offset 3 forever if no new messages 
 are sent to partition 2.

Also, does this work with a log configured with compact? Such a log may not 
have consecutive offsets to begin with.


> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 614-618
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line614>
> >
> >     This kind of coordination doesn't quite work. Suppose that we set 
> > inRebalance to true and entered the while loop. However, just before we 
> > call inRelanace.wait(), the producer could have finished sending all data 
> > and called inRebalance.notify(). Then we will be stuck in 
> > inRebalance.wait() forever since we missed the notification.
> >     
> >     One way to do that is to create a lock that protects the read/write of 
> > numMessageUnacked. Then we use a condition created from the lock to do the 
> > coordination. This way, both the wait/notification and the update/check of 
> > numMessageUnacked are protected by the same lock.
> 
> Jiangjie Qin wrote:
>     Yes, it is clearer to just use a single lock. I'll change the code.
>     
>     But the current code itself seems to work, please corret me if I'm wrong.
>     In this case we essentially guarantees that [enable notify, 
> numMessagesUnacked > 0 then wait] is atomic to notify. So either notify is 
> not enabled yet or it occurs after rebalance listener starts to wait. So the 
> notify will not be missed.
>     If rebalance listener grabs the inRebalance lock first and is in while 
> loop then it won't release the inRebalance lock until it enters wait. Because 
> producer have to grab inRebalance lock before calling inReblance.notify(), 
> the producer will not be able to call inRebalance.notify() until the offset 
> commit thread starts to wait. 
>     There are some more complicated sequences when producers call notify 
> mutilple times, but as long as 
>     1.[numMessageUnacked > 0 then wait] is atomic to notify, and
>     2.[numMessageUnacked = 0] happens before notify
>     notify should not be missed.

Got it. I missed the part that we synchronize on inBalance before calling 
notify(). So, it does seem to work. However, it depends on the current ordering 
in the code. It's simpler to reason about if the access to the shared values 
(numMessageUnacked, isInBalance) and the wait/notify are protected under the 
same synchronization point together.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
-----------------------------------------------------------


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 7, 2014, 2:59 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to