I feel that it's simpler to just keep the format of the checkpoint file as it is and just add a separate checkpoint for low watermark. Low watermark and high watermark are maintained independently. So, not sure if there is significant benefit of storing them together.
Looking at the KIP again. I actually have another question on the api. Is there any benefit of returning a Future in the purgeDataBefore() api? Since admin apis are used infrequently, it seems that it's simpler to just have a blocking api and returns Map<TopicPartition, PurgeDataResult>? Thanks, Jun On Sun, Jan 22, 2017 at 3:56 PM, Dong Lin <lindon...@gmail.com> wrote: > Thanks for the comment Guozhang. Please don't worry about being late. I > would like to update the KIP if there is clear benefit of the new approach. > I am wondering if there is any use-case or operation aspects that would > benefit from the new approach. > > I am not saying that these checkpoint files have the same priority. I > mentioned other checkpoint files to suggest that it is OK to add one more > checkpoint file. To me three checkpoint files is not much different from > four checkpoint files. I am just inclined to not update the KIP if the only > benefit is to avoid addition of a new checkpoint file. > > > > On Sun, Jan 22, 2017 at 3:48 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > To me the distinction between recovery-checkpoint and > > replication-checkpoint are different from the distinction between these > two > > hw checkpoint values: when broker starts up and act as the leader for a > > partition, it can live without seeing the recovery checkpoint, but just > > cannot rely on the existing last log segment and need to fetch from other > > replicas; but if the replication-checkpoint file is missing, it is a > > correctness issue, as it does not know from where to truncate its data, > and > > also how to respond to a fetch request. That is why I think we can > separate > > these two types of files, since the latter one is more important than the > > previous one. > > > > That being said, I do not want to recall another vote on this since it is > > my bad not responding before the vote is called. Just wanted to point out > > for the record that this approach may have some operational scenarios > where > > one of the replication files is missing and we need to treat them > > specifically. > > > > > > Guozhang > > > > > > On Sun, Jan 22, 2017 at 1:56 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Yeah, your solution of adding new APIs certainly works and I don't > think > > > that is an issue. On the other hand I don't think it is an issue to > add a > > > new checkpoint file as well since we already have multiple checkpoint > > > files. The benefit of the new approach you mentioned is probably not an > > > issue in the current approach since high watermark and low watermark > > works > > > completely independently. Since there is no strong reason to choose > > either > > > of them, I am inclined to choose the one that makes less format change > > and > > > simpler in the Java API. The current approach seems better w.r.t this > > minor > > > reason. > > > > > > If you are strong that we should use the new approach, I can do that as > > > well. Please let me know if you think so, and I will need to ask > > > Jun/Joel/Becket to vote on this again since this changes the interface > of > > > the KIP. > > > > > > On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > I think this is less of an issue: we can use the same patterns as in > > the > > > > request protocol, i.e.: > > > > > > > > write(Map[TP, Long]) // write the checkout point in v0 format > > > > write(Map[TP, Pair[Long, Long]]) // write the checkout point in v1 > > format > > > > > > > > CheckpointedOffsets read() // read the file relying on its version id > > > > > > > > class CheckpointedOffsets { > > > > > > > > Integer getVersion(); > > > > Long getFirstOffset(); > > > > Long getSecondOffset(); // would return NO_AVAILABLE with v0 > > format > > > > } > > > > > > > > > > > > As I think of it, another benefit is that we wont have a partition > that > > > > only have one of the watermarks in case of a failure in between > writing > > > two > > > > files. > > > > > > > > Guozhang > > > > > > > > On Sun, Jan 22, 2017 at 12:03 AM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > Hey Guozhang, > > > > > > > > > > Thanks for the review:) Yes it is possible to combine them. Both > > > solution > > > > > will have the same performance. But I think the current solution > will > > > > give > > > > > us simpler Java class design. Note that we will have to change Java > > API > > > > > (e.g. read() and write()) of OffsetCheckpoint class in order to > > > provide a > > > > > map from TopicPartition to a pair of integers when we write to > > > checkpoint > > > > > file. This makes this class less generic since this API is not used > > by > > > > log > > > > > recovery checkpoint and log cleaner checkpoint which are also using > > > > > OffsetCheckpoint class. > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Jan 21, 2017 at 12:28 PM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > Sorry for being late on reviewing this KIP. It LGTM overall, but > > I'm > > > > > > wondering if we can save adding the "replication-low-watermark- > > > > > checkpoint" > > > > > > file by just bumping up the version number of > "replication-offset- > > > > > > checkpoint" > > > > > > to let it have two values for each partition, i.e.: > > > > > > > > > > > > 1 // version number > > > > > > [number of partitions] > > > > > > [topic name] [partition id] [lwm] [hwm] > > > > > > > > > > > > > > > > > > This will affects the upgrade path a bit, but I think not by > large, > > > and > > > > > all > > > > > > other logic will not be affected. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 6:12 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > > > Thanks to everyone who voted and provided feedback! > > > > > > > > > > > > > > This KIP is now adopted with 3 binding +1s (Jun, Joel, Becket) > > and > > > 2 > > > > > > > non-binding +1s (Radai, Mayuresh). > > > > > > > > > > > > > > Thanks, > > > > > > > Dong > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 6:05 PM, Jun Rao <j...@confluent.io> > > wrote: > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > Thanks for the update. +1 > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 1:44 PM, Dong Lin < > lindon...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > After some more thinking, I agree with you that it is > better > > to > > > > > > simply > > > > > > > > > throw OffsetOutOfRangeException and not update > low_watermark > > if > > > > > > > > > offsetToPurge is larger than high_watermark. > > > > > > > > > > > > > > > > > > My use-case of allowing low_watermark > high_watermark in > > 2(b) > > > is > > > > > to > > > > > > > > allow > > > > > > > > > user to purge all the data in the log even if that data is > > not > > > > > fully > > > > > > > > > replicated to followers. An offset higher than > high_watermark > > > may > > > > > be > > > > > > > > > returned to user either through producer's RecordMetadata, > or > > > > > through > > > > > > > > > ListOffsetResponse if from_consumer option is false. > However, > > > > this > > > > > > may > > > > > > > > > cause problem in case of unclean leader election or when > > > consumer > > > > > > seeks > > > > > > > > to > > > > > > > > > the largest offset of the partition. It will complicate > this > > > KIP > > > > if > > > > > > we > > > > > > > > were > > > > > > > > > to address these two problems. > > > > > > > > > > > > > > > > > > At this moment I prefer to keep this KIP simple by > requiring > > > > > > > > low_watermark > > > > > > > > > <= high_watermark. The caveat is that if user does want to > > > purge > > > > > > *all* > > > > > > > > the > > > > > > > > > data that is already produced, then he needs to stop all > > > > producers > > > > > > that > > > > > > > > are > > > > > > > > > producing into this topic, wait long enough for all > followers > > > to > > > > > > catch > > > > > > > > up, > > > > > > > > > and then purge data using the latest offset of this > > partition, > > > > i.e. > > > > > > > > > high_watermark. We can revisit this if some strong use-case > > > comes > > > > > up > > > > > > in > > > > > > > > the > > > > > > > > > future. > > > > > > > > > > > > > > > > > > I also updated the KIP to allow user to use offset -1L to > > > > indicate > > > > > > > > > high_watermark in the PurgeRequest. In the future we can > > allow > > > > > users > > > > > > to > > > > > > > > use > > > > > > > > > offset -2L to indicate that they want to purge all data up > to > > > > > > > > logEndOffset. > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 10:37 AM, Jun Rao < > j...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > For 2(b), it seems a bit weird to allow highWatermark to > be > > > > > smaller > > > > > > > > than > > > > > > > > > > lowWatermark. Also, from the consumer's perspective, > > messages > > > > are > > > > > > > > > available > > > > > > > > > > only up to highWatermark. What if we simply throw > > > > > > > > > OffsetOutOfRangeException > > > > > > > > > > if offsetToPurge is larger than highWatermark? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:54 PM, Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > Thank you. Please see my answers below. The KIP is > > updated > > > to > > > > > > > answer > > > > > > > > > > these > > > > > > > > > > > questions (see here > > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/ > > > > > > > diffpagesbyversion.action > > > > > > > > ? > > > > > > > > > > > pageId=67636826&selectedPageVersions=5& > > > > selectedPageVersions=6> > > > > > > > > > > > ). > > > > > > > > > > > > > > > > > > > > > > 1. Yes, in this KIP we wait for all replicas. This is > the > > > > same > > > > > as > > > > > > > if > > > > > > > > > > > producer sends a messge with ack=all and > > isr=all_replicas. > > > So > > > > > it > > > > > > > > seems > > > > > > > > > > that > > > > > > > > > > > the comparison is OK? > > > > > > > > > > > > > > > > > > > > > > 2. Good point! I haven't thought about the case where > the > > > > > > > > > user-specified > > > > > > > > > > > offset > logEndOffset. Please see answers below. > > > > > > > > > > > > > > > > > > > > > > a) If offsetToPurge < lowWatermark, the first condition > > > > > > > > > > > of DelayedOperationPurgatory will be satisfied > > immediately > > > > when > > > > > > > > broker > > > > > > > > > > > receives PurgeRequest. Broker will send PurgeResponse > to > > > > admin > > > > > > > client > > > > > > > > > > > immediately. The response maps this partition to the > > > > > > lowWatermark. > > > > > > > > > > > > > > > > > > > > > > This case is covered as the first condition of > > > > > > > > > DelayedOperationPurgatory > > > > > > > > > > in > > > > > > > > > > > the current KIP. > > > > > > > > > > > > > > > > > > > > > > b) If highWatermark < offsetToPurge < logEndOffset, > > leader > > > > will > > > > > > > send > > > > > > > > > > > FetchResponse with low_watermark=offsetToPurge. > Follower > > > > > records > > > > > > > the > > > > > > > > > > > offsetToPurge as low_watermark and sends FetchRequest > to > > > the > > > > > > leader > > > > > > > > > with > > > > > > > > > > > the new low_watermark. Leader will then send > > PurgeResponse > > > to > > > > > > admin > > > > > > > > > > client > > > > > > > > > > > which maps this partition to the new low_watermark. The > > > data > > > > in > > > > > > the > > > > > > > > > range > > > > > > > > > > > [highWatermark, offsetToPurge] will still be appended > > from > > > > > leader > > > > > > > to > > > > > > > > > > > followers but will not be exposed to consumers. And in > a > > > > short > > > > > > > period > > > > > > > > > of > > > > > > > > > > > time low_watermark on the follower will be higher than > > > their > > > > > > > > > > highWatermark. > > > > > > > > > > > > > > > > > > > > > > This case is also covered in the current KIP so no > change > > > is > > > > > > > > required. > > > > > > > > > > > > > > > > > > > > > > c) If logEndOffset < offsetToPurge, leader will send > > > > > > PurgeResponse > > > > > > > to > > > > > > > > > > admin > > > > > > > > > > > client immediately. The response maps this partition to > > > > > > > > > > > OffsetOutOfRangeException. > > > > > > > > > > > > > > > > > > > > > > This case is not covered by the current KIP. I just > added > > > > this > > > > > as > > > > > > > the > > > > > > > > > > > second condition for the PurgeRequest to be removed > from > > > > > > > > > > > DelayedOperationPurgatory (in the Proposed Change > > section). > > > > > Since > > > > > > > the > > > > > > > > > > > PurgeRequest is satisfied immediately when the leader > > > > receives > > > > > > it, > > > > > > > it > > > > > > > > > > > actually won't be put into the > DelayedOperationPurgatory. > > > > > > > > > > > > > > > > > > > > > > 3. Yes, lowWatermark will be used when smallest_offset > is > > > > used > > > > > in > > > > > > > the > > > > > > > > > > > ListOffsetRequest. I just updated Proposed Change > section > > > to > > > > > > > specify > > > > > > > > > > this. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 6:53 PM, Jun Rao < > > j...@confluent.io > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. Looks good overall. Just a few > more > > > > > > comments. > > > > > > > > > > > > > > > > > > > > > > > > 1."Note that the way broker handles PurgeRequest is > > > similar > > > > > to > > > > > > > how > > > > > > > > it > > > > > > > > > > > > handles ProduceRequest with ack = -1 and > > > isr=all_replicas". > > > > > It > > > > > > > > seems > > > > > > > > > > that > > > > > > > > > > > > the implementation is a bit different. In this KIP, > we > > > wait > > > > > for > > > > > > > all > > > > > > > > > > > > replicas. But in producer, acks=all means waiting for > > all > > > > > > in-sync > > > > > > > > > > > replicas. > > > > > > > > > > > > > > > > > > > > > > > > 2. Could you describe the behavior when the specified > > > > > > > offsetToPurge > > > > > > > > > is > > > > > > > > > > > (a) > > > > > > > > > > > > smaller than lowWatermark, (b) larger than > > highWatermark, > > > > but > > > > > > > > smaller > > > > > > > > > > > than > > > > > > > > > > > > log end offset, (c) larger than log end offset? > > > > > > > > > > > > > > > > > > > > > > > > 3. In the ListOffsetRequest, will lowWatermark be > > > returned > > > > > when > > > > > > > the > > > > > > > > > > > > smallest_offset option is used? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin < > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > It seems that there is no further concern with the > > > > KIP-107. > > > > > > At > > > > > > > > this > > > > > > > > > > > point > > > > > > > > > > > > > we would like to start the voting process. The KIP > > can > > > be > > > > > > found > > > > > > > > at > > > > > > > > > > > > > https://cwiki.apache.org/confl > > uence/display/KAFKA/KIP- > > > > 107 > > > > > > > > > > > > > %3A+Add+purgeDataBefore%28%29+API+in+AdminClient. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > >