Yeah, your solution of adding new APIs certainly works and I don't think
that is an issue. On the other hand I don't think it is an issue to add a
new checkpoint file as well since we already have multiple checkpoint
files. The benefit of the new approach you mentioned is probably not an
issue in the current approach since high watermark and low watermark works
completely independently. Since there is no strong reason to choose either
of them, I am inclined to choose the one that makes less format change and
simpler in the Java API. The current approach seems better w.r.t this minor
reason.

If you are strong that we should use the new approach, I can do that as
well. Please let me know if you think so, and I will need to ask
Jun/Joel/Becket to vote on this again since this changes the interface of
the KIP.

On Sun, Jan 22, 2017 at 9:35 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> I think this is less of an issue: we can use the same patterns as in the
> request protocol, i.e.:
>
> write(Map[TP, Long]) // write the checkout point in v0 format
> write(Map[TP, Pair[Long, Long]]) // write the checkout point in v1 format
>
> CheckpointedOffsets read() // read the file relying on its version id
>
> class CheckpointedOffsets {
>
>     Integer getVersion();
>     Long getFirstOffset();
>     Long getSecondOffset();   // would return NO_AVAILABLE with v0 format
> }
>
>
> As I think of it, another benefit is that we wont have a partition that
> only have one of the watermarks in case of a failure in between writing two
> files.
>
> Guozhang
>
> On Sun, Jan 22, 2017 at 12:03 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Guozhang,
> >
> > Thanks for the review:) Yes it is possible to combine them. Both solution
> > will have the same performance. But I think the current solution will
> give
> > us simpler Java class design. Note that we will have to change Java API
> > (e.g. read() and write()) of OffsetCheckpoint class in order to provide a
> > map from TopicPartition to a pair of integers when we write to checkpoint
> > file. This makes this class less generic since this API is not used by
> log
> > recovery checkpoint and log cleaner checkpoint which are also using
> > OffsetCheckpoint class.
> >
> > Dong
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Jan 21, 2017 at 12:28 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Sorry for being late on reviewing this KIP. It LGTM overall, but I'm
> > > wondering if we can save adding the "replication-low-watermark-
> > checkpoint"
> > > file by just bumping up the version number of "replication-offset-
> > > checkpoint"
> > > to let it have two values for each partition, i.e.:
> > >
> > > 1  // version number
> > > [number of partitions]
> > > [topic name] [partition id] [lwm] [hwm]
> > >
> > >
> > > This will affects the upgrade path a bit, but I think not by large, and
> > all
> > > other logic will not be affected.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Wed, Jan 18, 2017 at 6:12 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Thanks to everyone who voted and provided feedback!
> > > >
> > > > This KIP is now adopted with 3 binding +1s (Jun, Joel, Becket) and 2
> > > > non-binding +1s (Radai, Mayuresh).
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Wed, Jan 18, 2017 at 6:05 PM, Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > Hi, Dong,
> > > > >
> > > > > Thanks for the update. +1
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Jan 18, 2017 at 1:44 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > After some more thinking, I agree with you that it is better to
> > > simply
> > > > > > throw OffsetOutOfRangeException and not update low_watermark if
> > > > > > offsetToPurge is larger than high_watermark.
> > > > > >
> > > > > > My use-case of allowing low_watermark > high_watermark in 2(b) is
> > to
> > > > > allow
> > > > > > user to purge all the data in the log even if that data is not
> > fully
> > > > > > replicated to followers. An offset higher than high_watermark may
> > be
> > > > > > returned to user either through producer's RecordMetadata, or
> > through
> > > > > > ListOffsetResponse if from_consumer option is false. However,
> this
> > > may
> > > > > > cause problem in case of unclean leader election or when consumer
> > > seeks
> > > > > to
> > > > > > the largest offset of the partition. It will complicate this KIP
> if
> > > we
> > > > > were
> > > > > > to address these two problems.
> > > > > >
> > > > > > At this moment I prefer to keep this KIP simple by requiring
> > > > > low_watermark
> > > > > > <= high_watermark. The caveat is that if user does want to purge
> > > *all*
> > > > > the
> > > > > > data that is already produced, then he needs to stop all
> producers
> > > that
> > > > > are
> > > > > > producing into this topic, wait long enough for all followers to
> > > catch
> > > > > up,
> > > > > > and then purge data using the latest offset of this partition,
> i.e.
> > > > > > high_watermark. We can revisit this if some strong use-case comes
> > up
> > > in
> > > > > the
> > > > > > future.
> > > > > >
> > > > > > I also updated the KIP to allow user to use offset -1L to
> indicate
> > > > > > high_watermark in the PurgeRequest. In the future we can allow
> > users
> > > to
> > > > > use
> > > > > > offset -2L to indicate that they want to purge all data up to
> > > > > logEndOffset.
> > > > > >
> > > > > > Thanks!
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 18, 2017 at 10:37 AM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > Hi, Dong,
> > > > > > >
> > > > > > > For 2(b), it seems a bit weird to allow highWatermark to be
> > smaller
> > > > > than
> > > > > > > lowWatermark. Also, from the consumer's perspective, messages
> are
> > > > > > available
> > > > > > > only up to highWatermark. What if we simply throw
> > > > > > OffsetOutOfRangeException
> > > > > > > if offsetToPurge is larger than highWatermark?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Jan 17, 2017 at 9:54 PM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > Thank you. Please see my answers below. The KIP is updated to
> > > > answer
> > > > > > > these
> > > > > > > > questions (see here
> > > > > > > > <https://cwiki.apache.org/confluence/pages/
> > > > diffpagesbyversion.action
> > > > > ?
> > > > > > > > pageId=67636826&selectedPageVersions=5&
> selectedPageVersions=6>
> > > > > > > > ).
> > > > > > > >
> > > > > > > > 1. Yes, in this KIP we wait for all replicas. This is the
> same
> > as
> > > > if
> > > > > > > > producer sends a messge with ack=all and isr=all_replicas. So
> > it
> > > > > seems
> > > > > > > that
> > > > > > > > the comparison is OK?
> > > > > > > >
> > > > > > > > 2. Good point! I haven't thought about the case where the
> > > > > > user-specified
> > > > > > > > offset > logEndOffset. Please see answers below.
> > > > > > > >
> > > > > > > > a) If offsetToPurge < lowWatermark, the first condition
> > > > > > > > of DelayedOperationPurgatory will be satisfied immediately
> when
> > > > > broker
> > > > > > > > receives PurgeRequest. Broker will send PurgeResponse to
> admin
> > > > client
> > > > > > > > immediately. The response maps this partition to the
> > > lowWatermark.
> > > > > > > >
> > > > > > > > This case is covered as the first condition of
> > > > > > DelayedOperationPurgatory
> > > > > > > in
> > > > > > > > the current KIP.
> > > > > > > >
> > > > > > > > b) If highWatermark < offsetToPurge < logEndOffset, leader
> will
> > > > send
> > > > > > > > FetchResponse with low_watermark=offsetToPurge. Follower
> > records
> > > > the
> > > > > > > > offsetToPurge as low_watermark and sends FetchRequest to the
> > > leader
> > > > > > with
> > > > > > > > the new low_watermark. Leader will then send PurgeResponse to
> > > admin
> > > > > > > client
> > > > > > > > which maps this partition to the new low_watermark. The data
> in
> > > the
> > > > > > range
> > > > > > > > [highWatermark, offsetToPurge] will still be appended from
> > leader
> > > > to
> > > > > > > > followers but will not be exposed to consumers. And in a
> short
> > > > period
> > > > > > of
> > > > > > > > time low_watermark on the follower will be higher than their
> > > > > > > highWatermark.
> > > > > > > >
> > > > > > > > This case is also covered in the current KIP so no change is
> > > > > required.
> > > > > > > >
> > > > > > > > c) If logEndOffset < offsetToPurge, leader will send
> > > PurgeResponse
> > > > to
> > > > > > > admin
> > > > > > > > client immediately. The response maps this partition to
> > > > > > > > OffsetOutOfRangeException.
> > > > > > > >
> > > > > > > > This case is not covered by the current KIP. I just added
> this
> > as
> > > > the
> > > > > > > > second condition for the PurgeRequest to be removed from
> > > > > > > > DelayedOperationPurgatory (in the Proposed Change section).
> > Since
> > > > the
> > > > > > > > PurgeRequest is satisfied immediately when the leader
> receives
> > > it,
> > > > it
> > > > > > > > actually won't be put into the DelayedOperationPurgatory.
> > > > > > > >
> > > > > > > > 3. Yes, lowWatermark will be used when smallest_offset is
> used
> > in
> > > > the
> > > > > > > > ListOffsetRequest. I just updated Proposed Change section to
> > > > specify
> > > > > > > this.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > > On Tue, Jan 17, 2017 at 6:53 PM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Dong,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP. Looks good overall. Just a few more
> > > comments.
> > > > > > > > >
> > > > > > > > > 1."Note that the way broker handles PurgeRequest is similar
> > to
> > > > how
> > > > > it
> > > > > > > > > handles ProduceRequest with ack = -1 and isr=all_replicas".
> > It
> > > > > seems
> > > > > > > that
> > > > > > > > > the implementation is a bit different. In this KIP, we wait
> > for
> > > > all
> > > > > > > > > replicas. But in producer, acks=all means waiting for all
> > > in-sync
> > > > > > > > replicas.
> > > > > > > > >
> > > > > > > > > 2. Could you describe the behavior when the specified
> > > > offsetToPurge
> > > > > > is
> > > > > > > > (a)
> > > > > > > > > smaller than lowWatermark, (b) larger than highWatermark,
> but
> > > > > smaller
> > > > > > > > than
> > > > > > > > > log end offset, (c) larger than log end offset?
> > > > > > > > >
> > > > > > > > > 3. In the ListOffsetRequest, will lowWatermark be returned
> > when
> > > > the
> > > > > > > > > smallest_offset option is used?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Jan 11, 2017 at 1:01 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > It seems that there is no further concern with the
> KIP-107.
> > > At
> > > > > this
> > > > > > > > point
> > > > > > > > > > we would like to start the voting process. The KIP can be
> > > found
> > > > > at
> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 107
> > > > > > > > > > %3A+Add+purgeDataBefore%28%29+API+in+AdminClient.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Dong
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to