Hi Dong, Sorry for being late on reviewing this KIP. It LGTM overall, but I'm wondering if we can save adding the "replication-low-watermark-checkpoint" file by just bumping up the version number of "replication-offset-checkpoint" to let it have two values for each partition, i.e.:
1 // version number [number of partitions] [topic name] [partition id] [lwm] [hwm] This will affects the upgrade path a bit, but I think not by large, and all other logic will not be affected. Guozhang On Wed, Jan 18, 2017 at 6:12 PM, Dong Lin <lindon...@gmail.com> wrote: > Thanks to everyone who voted and provided feedback! > > This KIP is now adopted with 3 binding +1s (Jun, Joel, Becket) and 2 > non-binding +1s (Radai, Mayuresh). > > Thanks, > Dong > > On Wed, Jan 18, 2017 at 6:05 PM, Jun Rao <j...@confluent.io> wrote: > > > Hi, Dong, > > > > Thanks for the update. +1 > > > > Jun > > > > On Wed, Jan 18, 2017 at 1:44 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hi Jun, > > > > > > After some more thinking, I agree with you that it is better to simply > > > throw OffsetOutOfRangeException and not update low_watermark if > > > offsetToPurge is larger than high_watermark. > > > > > > My use-case of allowing low_watermark > high_watermark in 2(b) is to > > allow > > > user to purge all the data in the log even if that data is not fully > > > replicated to followers. An offset higher than high_watermark may be > > > returned to user either through producer's RecordMetadata, or through > > > ListOffsetResponse if from_consumer option is false. However, this may > > > cause problem in case of unclean leader election or when consumer seeks > > to > > > the largest offset of the partition. It will complicate this KIP if we > > were > > > to address these two problems. > > > > > > At this moment I prefer to keep this KIP simple by requiring > > low_watermark > > > <= high_watermark. The caveat is that if user does want to purge *all* > > the > > > data that is already produced, then he needs to stop all producers that > > are > > > producing into this topic, wait long enough for all followers to catch > > up, > > > and then purge data using the latest offset of this partition, i.e. > > > high_watermark. We can revisit this if some strong use-case comes up in > > the > > > future. > > > > > > I also updated the KIP to allow user to use offset -1L to indicate > > > high_watermark in the PurgeRequest. In the future we can allow users to > > use > > > offset -2L to indicate that they want to purge all data up to > > logEndOffset. > > > > > > Thanks! > > > Dong > > > > > > > > > On Wed, Jan 18, 2017 at 10:37 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Dong, > > > > > > > > For 2(b), it seems a bit weird to allow highWatermark to be smaller > > than > > > > lowWatermark. Also, from the consumer's perspective, messages are > > > available > > > > only up to highWatermark. What if we simply throw > > > OffsetOutOfRangeException > > > > if offsetToPurge is larger than highWatermark? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Jan 17, 2017 at 9:54 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Thank you. Please see my answers below. The KIP is updated to > answer > > > > these > > > > > questions (see here > > > > > <https://cwiki.apache.org/confluence/pages/ > diffpagesbyversion.action > > ? > > > > > pageId=67636826&selectedPageVersions=5&selectedPageVersions=6> > > > > > ). > > > > > > > > > > 1. Yes, in this KIP we wait for all replicas. This is the same as > if > > > > > producer sends a messge with ack=all and isr=all_replicas. So it > > seems > > > > that > > > > > the comparison is OK? > > > > > > > > > > 2. Good point! I haven't thought about the case where the > > > user-specified > > > > > offset > logEndOffset. Please see answers below. > > > > > > > > > > a) If offsetToPurge < lowWatermark, the first condition > > > > > of DelayedOperationPurgatory will be satisfied immediately when > > broker > > > > > receives PurgeRequest. Broker will send PurgeResponse to admin > client > > > > > immediately. The response maps this partition to the lowWatermark. > > > > > > > > > > This case is covered as the first condition of > > > DelayedOperationPurgatory > > > > in > > > > > the current KIP. > > > > > > > > > > b) If highWatermark < offsetToPurge < logEndOffset, leader will > send > > > > > FetchResponse with low_watermark=offsetToPurge. Follower records > the > > > > > offsetToPurge as low_watermark and sends FetchRequest to the leader > > > with > > > > > the new low_watermark. Leader will then send PurgeResponse to admin > > > > client > > > > > which maps this partition to the new low_watermark. The data in the > > > range > > > > > [highWatermark, offsetToPurge] will still be appended from leader > to > > > > > followers but will not be exposed to consumers. And in a short > period > > > of > > > > > time low_watermark on the follower will be higher than their > > > > highWatermark. > > > > > > > > > > This case is also covered in the current KIP so no change is > > required. > > > > > > > > > > c) If logEndOffset < offsetToPurge, leader will send PurgeResponse > to > > > > admin > > > > > client immediately. The response maps this partition to > > > > > OffsetOutOfRangeException. > > > > > > > > > > This case is not covered by the current KIP. I just added this as > the > > > > > second condition for the PurgeRequest to be removed from > > > > > DelayedOperationPurgatory (in the Proposed Change section). Since > the > > > > > PurgeRequest is satisfied immediately when the leader receives it, > it > > > > > actually won't be put into the DelayedOperationPurgatory. > > > > > > > > > > 3. Yes, lowWatermark will be used when smallest_offset is used in > the > > > > > ListOffsetRequest. I just updated Proposed Change section to > specify > > > > this. > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > On Tue, Jan 17, 2017 at 6:53 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > Thanks for the KIP. Looks good overall. Just a few more comments. > > > > > > > > > > > > 1."Note that the way broker handles PurgeRequest is similar to > how > > it > > > > > > handles ProduceRequest with ack = -1 and isr=all_replicas". It > > seems > > > > that > > > > > > the implementation is a bit different. In this KIP, we wait for > all > > > > > > replicas. But in producer, acks=all means waiting for all in-sync > > > > > replicas. > > > > > > > > > > > > 2. Could you describe the behavior when the specified > offsetToPurge > > > is > > > > > (a) > > > > > > smaller than lowWatermark, (b) larger than highWatermark, but > > smaller > > > > > than > > > > > > log end offset, (c) larger than log end offset? > > > > > > > > > > > > 3. In the ListOffsetRequest, will lowWatermark be returned when > the > > > > > > smallest_offset option is used? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > It seems that there is no further concern with the KIP-107. At > > this > > > > > point > > > > > > > we would like to start the voting process. The KIP can be found > > at > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107 > > > > > > > %3A+Add+purgeDataBefore%28%29+API+in+AdminClient. > > > > > > > > > > > > > > Thanks, > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang