> On Dec. 6, 2014, 5:17 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 529-530
> > <https://reviews.apache.org/r/25995/diff/10/?file=782732#file782732line529>
> >
> >     Does that imply there is always an offset? Is that always true?
> >     
> >     I don't quite follow how the logic works. Since the offset for each 
> > target partition is updated independantly, I am not sure why you can rely 
> > on checking that those offsets are consecutive. Also, does this logic work 
> > when there is partitioning key?
> >     
> >     It would be useful to add some comments to describe why a two-level 
> > offset map is needed.
> 
> Jiangjie Qin wrote:
>     The iterator only iterates over the key that exits in the map, i.e. 
> offset exists.
>     
>     It does seem confusing without explanation... Will it be clearer if the 
> following comments are added?
>     
>     Following is the offset commit logic:
>     We know that:
>     1. Messages from same source partition end up in the same data channel 
> queue in order and will be sent by the same producer.
>     2. Messages sent by the same producer could go to different target 
> partitions even if the messages are from the same source partition. 
>     3. The order is only guaranteed for messages sent to the same target 
> partition. That means a (SourceTopicPartition, TargetPartition) combination 
> is needed.
>     4. For each (SourceTopicParitition, TargetPartition), keeping track of a 
> single offset is sufficient, because if an offset is acked, all the offset 
> smaller than that offset going to the same target partition must have been 
> sent successfully (MaxInFlightRequest=1). That said, if we have multiple 
> producers, after sorting all the last acked offsets of target partitions 
> which are corresponding to the same source partition, we can commit the 
> offsets from the smallest until the acked offset is no longer consecutive. 
> (But we do need to set send retries to be infinite in producer config, 
> otherwise this won't work. I'll add it to the comments.)
>     
>     Based on above logic, we could use Map<(SourceTopicParitition, 
> TargetPartition), offset> to track the offset. But because the offset commit 
> is based on source topic partitions, it is easier to have a 
> Map<SourceTopicPartition, Map<TargetPatition, offset>> to find the offset to 
> commit. That's why there is a 2-level map.
>     
>     The logic above does not rely on any message key, so it works in general 
> for both keyed/non-keyed messages.
> 
> Jun Rao wrote:
>     The part that's not clear to me is "until the acked offset is no longer 
> consecutive". Suppose there are 2 target partitions. Offset 1,2,4,5 can be 
> routed to partition 1 and offset 3 can be routed to partition 2. When all 
> messages are sent, you will see partition 1 with offset 5 and partition 2 
> with offset 3. Since offset 3 and 5 are not consecutive, are you just going 
> to commit offset 3? If so, you may be stuck in offset 3 forever if no new 
> messages  are sent to partition 2.
>     
>     Also, does this work with a log configured with compact? Such a log may 
> not have consecutive offsets to begin with.
> 
> Jiangjie Qin wrote:
>     I see, you are right. In that case it won't work. How about the following 
> algorithm:
>     1. We keep a Map<SourceTopicPartition, (Set<Offset>, MaxOffsetInSet)>
>     2. When a message is sent, producer put its offset into the map based on 
> source topic partition, and update the MaxOffsetInSet if necessary
>     3. In the producer callback, remove the offset from the Set<offset> if 
> the message was sent successfully
>     4. When offset commit thread comes, it gets the smallest offset in the 
> Set<offset> and commit this one. If the offset is empty, it commits 
> MaxOffsetInSet+1.
>     
>     This algorithm should be able to handle compacted partitions as well.

Yes, but getting the smallest offset from a set can be expensive. I was 
thinking that we can put the offset of all outstanding messages in a linked 
list in source offset order. Every produced message will be taken out of the 
linked list. We also maintain a maxOffsetSeen. The commit offset will be the 
offset in the first link if the linked list is not empty. Otherwise, it will be 
maxOffsetSeen.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review64152
-----------------------------------------------------------


On Dec. 7, 2014, 2:59 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Dec. 7, 2014, 2:59 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
>     https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>       core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>       
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> numMessageUnacked should be decremented no matter the send was successful or 
> not.
> 
> 
> Addressed Jun's comments.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
> 62c0686e816d2888772d5a911becf625eedee397 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to