+1 - I was thinking the exact same thing. On Tue, Oct 25, 2016 at 2:52 PM, Jun Rao <j...@confluent.io> wrote:
> One of the main reasons for retaining messages on the broker after > consumption is to support replay. A common reason for replay is to fix and > application error. So, it seems that it's a bit hard to delete log segments > just based on the committed offsets that the broker knows. An alternative > approach is to support an api that can trim the log up to a specified > offset (similar to what's being discussed in KIP-47). This way, an > application can control when and how much to trim the log. > > Thanks, > > Jun > > On Mon, Oct 24, 2016 at 11:11 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Overall I think the motivation is common and of interests to lots of > users. > > Would like to throw my two cents on this discussion: > > > > 1. Kafka topics can be used in different ways. For some categories of > > topics (think: "pageView" event topics), it is a shared topic among > > different teams / apps within the organization and lots of temporary > > consumers (for debugging, trouble shooting, prototype development, etc) > can > > come and go dynamically, in which case it is hard to track all of such > > consumer and maintain the minimum committed offsets; on the other hand, > > there are another category of topics (think: stream-app owned > intermediate > > topics like "pricing-enriched-bid-activity", as Becket mentioned above) > > which are particularly own but only one or a few apps, and hence the > > consumer groups for those topics are pre-defined and roughly static. In > > this case I think it makes sense to allow such consumer-drive log > retention > > features. > > > > 2. In this case, my question is then whether this bookkeeping of > > min-committed-offsets should be done at the brokers side or it should be > on > > the app side. My gut feeling is that it could be better bookkept on the > app > > (i.e. client) side which has the full information of the "registered > > consumer groups" for certain topics, and then knows the > > min-committed-offsets. And a slightly-modified KIP-47 mentioned by Dong > > could a better fit, where a) app side bookkeep the consumer-driven min > > offset based on their committed offsets, by either talking to the > consumer > > clients directly or query broker for the committed offsets of those > > registered consumer groups, and then b) write > > *log.retention.min.offset* periodically > > to broker to let it delete old segments before that offset (NOTE that the > > semantics is exactly the same as to KIP-47, while the only difference is > > that we use offset instead of timestamp to indicate, which can be honor > by > > the same implementation of KIP-47 on broker side). > > > > My arguments for letting the app side to bookkeep such min-offsets and > only > > let brokers to take requests to delete segments accordingly are 1) > keeping > > the broker simple without any querying each other about such offsets and > > does the min() calculation, rather only keeping / deleting messages from > > client admin requests, and 2) allowing more generalized client-driven log > > retention policies with KIP-47 (i.e. broker is brainless and only take > > requests while client-app can apply any customized logic to determine the > > config values of *og.retention.min.offset or > **og.retention.min.timestamp* > > that > > they send to the brokers). > > > > > > > > Guozhang > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin <becket....@gmail.com> > wrote: > > > > > Hi David, > > > > > > > 1. What scenario is used to this configuration? > > > > > > One scenario is stream processing pipeline. In a stream processing DAG, > > > there will be a bunch of intermediate result, we only care about the > > > consumer group that is in the downstream of the DAG, but not other > > groups. > > > Ideally we want to delete the log of the intermediate topics right > after > > > all the downstream processing jobs has successfully processed the > > messages. > > > In that case, we only care about the downstream processing jobs, but > not > > > other groups. That means if a down stream job did not commit offset for > > > some reason, we want to wait for that job. Without the predefined > > > interested group, it is hard to achieve this. > > > > > > > > > 2. Yes, the configuration should be at topic level and set dynamically. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > > > > > > > Hi Mayuresh, > > > > Thanks for the reply: > > > > 1. In the log retention check schedule, the broker first find the > all > > > the > > > > consumed group which are consuming this topic, and query the commit > > > offset > > > > of this consumed group for the topic > > > > using the OffsetFetch API. And the min commit offset is the minimal > > > commit > > > > offset between these commit offsets. > > > > > > > > > > > > 2. If the console consumer reading and commit, its commit offset > will > > be > > > > used to calculate the min commit offset for this topic. > > > > We can avoid the random consumer using the method Becket suggested. > > > > > > > > > > > > 3. It will not delete the log immediately, the log will stay some > time > > ( > > > > retention.commitoffset.ms), and after that we only delete > > > > the log segments whose offsets are less than the min commit offset. > So > > > > the user can rewind its offset in the log.retention.ms. > > > > > > > > > > > > Thanks, > > > > David > > > > > > > > > > > > > > > > > > > > ------------------ 原始邮件 ------------------ > > > > 发件人: "Mayuresh Gharat";<gharatmayures...@gmail.com>; > > > > 发送时间: 2016年10月19日(星期三) 上午10:25 > > > > 收件人: "dev"<dev@kafka.apache.org>; > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > > > retention > > > > > > > > > > > > > > > > Hi David, > > > > > > > > Thanks for the KIP. > > > > > > > > I had some questions/suggestions : > > > > > > > > It would be great if you can explain with an example about how the > min > > > > offset for all the consumers will be calculated, in the KIP. > > > > What I meant was, it would be great to understand with a pseudo > > > > code/workflow if possible, how each broker knows all the consumers > for > > > the > > > > given topic-partition and how the min is calculated. > > > > > > > > Also it would be good to understand what happens if we start a > console > > > > consumer which would actually start reading from the beginning offset > > and > > > > commit and crash immediately. How will the segments get deleted? > > > > > > > > Will it delete all the log segments if all the consumers have read > till > > > > latest? If Yes, would we be able to handle a scenario were we say > that > > > user > > > > can rewind its offset to reprocess the data since log.retention.ms > > might > > > > not has reached. > > > > > > > > Thanks, > > > > > > > > Mayuresh > > > > > > > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > > > Hey David, > > > > > > > > > > Thanks for replies to the questions. > > > > > > > > > > I think one major thing still not clear at this point is that > whether > > > the > > > > > brokers will only apply the consumed log retention to a specific > set > > of > > > > > interested consumer groups, or it does not have such a set of > > consumer > > > > > groups. > > > > > > > > > > For example, for topic T, assume we know that there will be two > > > > downstream > > > > > consumer groups CG1 and CG2 consuming data from topic T. Will we > add > > a > > > > > topic configurations such as > > > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T > so > > > > that > > > > > the brokers only care about CG1 and CG2. The committed offsets of > > other > > > > > groups are not interested and won't have any impact on the > committed > > > > offset > > > > > based log retention. > > > > > > > > > > It seems the current proposal does not have an "interested consumer > > > group > > > > > set" configuration, so that means any random consumer group may > > affect > > > > the > > > > > committed offset based log retention. > > > > > > > > > > I think the committed offset based log retention seems more useful > in > > > > cases > > > > > where we already know which consumer groups will be consuming from > > this > > > > > topic, so we will only wait for those consumer groups but ignore > the > > > > > others. If a group will be consumed by many unknown or > unpredictable > > > > > consumer groups, it seems the existing time based log retention is > > much > > > > > simple and clear enough. So I would argue we don't need to address > > the > > > > case > > > > > that some groups may come later in the committed offset based > > > retention. > > > > > > > > > > That said, there may still be value to keep the data for some time > > even > > > > > after all the interested consumer groups have consumed the > messages. > > > For > > > > > example, in a pipelined stream processing DAG, we may want to keep > > the > > > > data > > > > > of an intermediate topic for some time in case the job fails. So we > > can > > > > > resume from a previously succeeded stage instead of restart the > > entire > > > > > pipeline. Or we can use the intermediate topic for some debugging > > work. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479...@qq.com> wrote: > > > > > > > > > > > Hi Dong, > > > > > > The KIP is used to solve both these 2 cases, we specify a > small > > > > > > consumed log retention time to deleted the consumed data and > avoid > > > > losing > > > > > > un-consumed data. > > > > > > And the specify a large force log retention time used as higher > > bound > > > > for > > > > > > the data. I will update the KIP for this info. > > > > > > Another solution I think may be ok is to support an API to > > delete > > > > the > > > > > > inactive group? If the group is in inactive, but it's commit > > offset > > > is > > > > > > also in the __commit_offsets topic and > > > > > > stay in the offset cache, we can delete it via this API. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > David > > > > > > > > > > > > > > > > > > ------------------ 原始邮件 ------------------ > > > > > > 发件人: "Dong Lin";<lindon...@gmail.com>; > > > > > > 发送时间: 2016年10月14日(星期五) 凌晨5:01 > > > > > > 收件人: "dev"<dev@kafka.apache.org>; > > > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > > > > > retention > > > > > > > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > As explained in the motivation section of the KIP, the problem is > > > that > > > > if > > > > > > log retention is too small, we may lose data; and if log > retention > > is > > > > too > > > > > > large, then we waste disk space. Therefore, we need to solve one > if > > > the > > > > > two > > > > > > problems -- allow data to be persisted longer for consumption if > > log > > > > > > retention is set too small, or allow data to be expired earlier > if > > > log > > > > > > retention is too large. I think the KIP probably needs to make > this > > > > clear > > > > > > and explain which one is rejected and why. Note that the choice > of > > > the > > > > > two > > > > > > affects the solution -- if we want to address the first problem > > then > > > > > > log.retention.ms should be used as lower bound on the actual > > > retention > > > > > > time, and if we want to address the second problem then the > > > > > > log.retention.ms > > > > > > should be used as higher bound on the actual retention time. > > > > > > > > > > > > In both cases, we probably need to figure out a way to determine > > > > "active > > > > > > consumer group". Maybe we can compare the time-since-last-commit > > > > against > > > > > a > > > > > > threshold to determine this. In addition, the threshold can be > > > > overridden > > > > > > either per-topic or per-groupId. If we go along this route, the > > > > rejected > > > > > > solution (per-topic vs. per-groupId) should probably be explained > > in > > > > the > > > > > > KIP. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > Thanks for your explanation. There still seems to be issue with > > > this > > > > > > > solution. Please see my comment inline. > > > > > > > > > > > > > > > > > > > > > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> > wrote: > > > > > > > > > > > > > >> Hi Dong, > > > > > > >> Sorry for the delay, here are the comments: > > > > > > >> 1.I think we should distinguish these two cases: > > > > > > >> (1) group has no member, but has commit offset : In this case > > we > > > > > should > > > > > > >> consider its commit offset > > > > > > >> (2) group has no member, no commit offset: Skip this group > > > > > > >> Is it ok? > > > > > > >> > > > > > > >> > > > > > > >> ListGroup API can list the groups, but this API only show the > > > > Online > > > > > > >> Group, so we should enhance the listGroup API to list those > > groups > > > > in > > > > > > the > > > > > > >> case (1) > > > > > > >> > > > > > > >> Say some user starts a consumer to consume topic A with > > > > > > > enable.auto.commit = true. Later they change the group name in > > the > > > > > > config. > > > > > > > Then the proposed solution will never execute consumed log > > > retention > > > > > for > > > > > > > the topic A, right? I think group name change is pretty common > > and > > > we > > > > > > > should take care of this issue. One possible solution is to > add a > > > > > config > > > > > > to > > > > > > > specify the maximum time since last offset commit before we > > > consider > > > > a > > > > > > > group is inactive. > > > > > > > > > > > > > > > > > > > > >> > > > > > > >> 2. Because every consumer group may appear in different time, > > say, > > > > > group > > > > > > >> 1 start to consume in day 1, group 2 start to consume in day > 2. > > > If > > > > we > > > > > > >> delete the log segment right away, > > > > > > >> group 2 can not consume these message. So we hope the > messages > > > can > > > > > hold > > > > > > >> for a specified time. I think many use-cases will need there > > > > configs, > > > > > > if > > > > > > >> there are many consumer groups. > > > > > > >> > > > > > > >> > > > > > > > If we want to take care of group 2, can we simply disable > > consumed > > > > log > > > > > > > retention for the topic and set log retention to 1 day? Can you > > > > explain > > > > > > the > > > > > > > benefit of enabling consumed log retention and set consumed log > > > > > retention > > > > > > > to 1 day? > > > > > > > > > > > > > > Currently the flow graph in the KIP suggests that we delete > data > > > iff > > > > > > > (consumed log retention is triggered OR forced log retention is > > > > > > triggered). > > > > > > > And alternative solution is to delete data iff ( (consumed log > > > > > retention > > > > > > is > > > > > > > disabled OR consumed log retention is triggered) AND forced log > > > > > retention > > > > > > > is triggered). I would argue that the 2nd scheme is better. Say > > the > > > > > > > consumed log retention is enabled. The 1st scheme basically > > > > interprets > > > > > > > forced log retention as the upper bound of the time the data > can > > > stay > > > > > in > > > > > > > Kafka. The 2nd scheme interprets forced log retention as the > > lower > > > > > bound > > > > > > of > > > > > > > the time the data can stay in Kafka, which is more consistent > > with > > > > the > > > > > > > purpose of having this forced log retention (to save disk > space). > > > And > > > > > if > > > > > > we > > > > > > > adopt the 2nd solution, the use-case you suggested can be > easily > > > > > > addressed > > > > > > > by setting forced log retention to 1 day and enable consumed > log > > > > > > retention. > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > >> Thanks, > > > > > > >> David > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> ------------------ 原始邮件 ------------------ > > > > > > >> 发件人: "Dong Lin";<lindon...@gmail.com>; > > > > > > >> 发送时间: 2016年10月10日(星期一) 下午4:05 > > > > > > >> 收件人: "dev"<dev@kafka.apache.org>; > > > > > > >> > > > > > > >> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before > log > > > > > > retention > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> Hey David, > > > > > > >> > > > > > > >> Thanks for reply. Please see comment inline. > > > > > > >> > > > > > > >> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) < > > > > pengwei...@huawei.com> > > > > > > >> wrote: > > > > > > >> > > > > > > >> > Hi Dong > > > > > > >> > Thanks for the questions: > > > > > > >> > > > > > > > >> > 1. Now we don't distinguish inactive or active groups. > > Because > > > in > > > > > > some > > > > > > >> > case maybe inactive group will become active again, and > using > > > the > > > > > > >> previous > > > > > > >> > commit offset. > > > > > > >> > > > > > > > >> > So we will not delete the log segment in the consumer > > retention > > > if > > > > > > there > > > > > > >> > are some groups consume but not commit, but the log segment > > can > > > be > > > > > > >> delete by > > > > > > >> > the force retention. > > > > > > >> > > > > > > > >> > > > > > > >> So in the example I provided, the consumed log retention will > be > > > > > > >> effectively disabled, right? This seems to be a real problem > in > > > > > > operation > > > > > > >> -- we don't want log retention to be un-intentionally disabled > > > > simply > > > > > > >> because someone start a tool to consume from that topic. > Either > > > this > > > > > KIP > > > > > > >> should provide a way to handle this, or there should be a way > > for > > > > > > operator > > > > > > >> to be aware of such case and be able to re-eanble consumed log > > > > > retention > > > > > > >> for the topic. What do you think? > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > 2. These configs are used to determine the out of date time > > of > > > > the > > > > > > >> > consumed retention, like the parameters of the force > retention > > > > > > >> > (log.retention.hours, log.retention.minutes, > log.retention.ms > > ). > > > > For > > > > > > >> > example, users want the save the log for 3 days, after 3 > days, > > > > kafka > > > > > > >> will > > > > > > >> > delete the log segments which are > > > > > > >> > > > > > > > >> > consumed by all the consumer group. The log retention > thread > > > need > > > > > > these > > > > > > >> > parameters. > > > > > > >> > > > > > > > >> > It makes sense to have configs such as log.retention.ms -- > it > > > is > > > > > used > > > > > > >> to > > > > > > >> make data available for up to a configured amount of time > before > > > it > > > > is > > > > > > >> deleted. My question is what is the use-case for making log > > > > available > > > > > > for > > > > > > >> another e.g. 3 days after it has been consumed by all consumer > > > > groups. > > > > > > The > > > > > > >> purpose of this KIP is to allow log to be deleted right as > long > > as > > > > all > > > > > > >> interested consumer groups have consumed it. Can you provide a > > > > > use-case > > > > > > >> for > > > > > > >> keeping log available for longer time after it has been > consumed > > > by > > > > > all > > > > > > >> groups? > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > Thanks, > > > > > > >> > David > > > > > > >> > > > > > > > >> > > > > > > > >> > > Hey David, > > > > > > >> > > > > > > > > >> > > Thanks for the KIP. Can you help with the following two > > > > questions: > > > > > > >> > > > > > > > > >> > > 1) If someone start a consumer (e.g. > kafka-console-consumer) > > > to > > > > > > >> consume a > > > > > > >> > > topic for debug/validation purpose, a randome consumer > group > > > may > > > > > be > > > > > > >> > created > > > > > > >> > > and offset may be committed for this consumer group. If no > > > > offset > > > > > > >> commit > > > > > > >> > is > > > > > > >> > > made for this consumer group in the future, will this > > > > effectively > > > > > > >> > > disable consumed log retention for this topic? In other > > words, > > > > how > > > > > > do > > > > > > >> > this > > > > > > >> > > KIP distinguish active consumer group from inactive ones? > > > > > > >> > > > > > > > > >> > > 2) Why do we need new configs such as > > > > > log.retention.commitoffset.hou > > > > > > >> rs? > > > > > > >> > Can > > > > > > >> > >we simply delete log segments if consumed log retention is > > > > enabled > > > > > > for > > > > > > >> > this > > > > > > >> > > topic and all consumer groups have consumed messages in > the > > > log > > > > > > >> segment? > > > > > > >> > > > > > > > > >> > > Thanks, > > > > > > >> > > Dong > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) < > > > > pengwei...@huawei.com > > > > > > > > > > > > >> > wrote: > > > > > > >> > > > > > > > > >> > > > Hi Becket, > > > > > > >> > > > > > > > > > >> > > > Thanks for the feedback: > > > > > > >> > > > 1. We use the simple consumer api to query the commit > > > offset, > > > > > so > > > > > > we > > > > > > >> > don't > > > > > > >> > > > need to specify the consumer group. > > > > > > >> > > > 2. Every broker using the simple consumer > > > api(OffsetFetchKey) > > > > > to > > > > > > >> query > > > > > > >> > > > the commit offset in the log retention process. The > > client > > > > can > > > > > > >> commit > > > > > > >> > > > offset or not. > > > > > > >> > > > 3. It does not need to distinguish the follower brokers > > or > > > > > leader > > > > > > >> > > > brokers, every brokers can query. > > > > > > >> > > > 4. We don't need to change the protocols, we mainly > > change > > > > the > > > > > > log > > > > > > >> > > > retention process in the log manager. > > > > > > >> > > > > > > > > > >> > > > One question is the query min offset need > O(partitions * > > > > > groups) > > > > > > >> time > > > > > > >> > > > complexity, another alternative is to build an internal > > > topic > > > > to > > > > > > >> save > > > > > > >> > every > > > > > > >> > > > partition's min offset, it can reduce to O(1). > > > > > > >> > > > I will update the wiki for more details. > > > > > > >> > > > > > > > > > >> > > > Thanks, > > > > > > >> > > > David > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > Hi Pengwei, > > > > > > >> > > > > > > > > > > >> > > > > Thanks for the KIP proposal. It is a very useful KIP. > > At a > > > > > high > > > > > > >> > level, > > > > > > >> > > > the > > > > > > >> > > > > proposed behavior looks reasonable to me. > > > > > > >> > > > > > > > > > > >> > > > > However, it seems that some of the details are not > > > mentioned > > > > > in > > > > > > >> the > > > > > > >> > KIP. > > > > > > >> > > > > For example, > > > > > > >> > > > > > > > > > > >> > > > > 1. How will the expected consumer group be specified? > Is > > > it > > > > > > >> through > > > > > > >> > a per > > > > > > >> > > > > topic dynamic configuration? > > > > > > >> > > > > 2. How do the brokers detect the consumer offsets? Is > it > > > > > > required > > > > > > >> > for a > > > > > > >> > > > > consumer to commit offsets? > > > > > > >> > > > > 3. How do all the replicas know the about the > committed > > > > > offsets? > > > > > > >> > e.g. 1) > > > > > > >> > > > > non-coordinator brokers which do not have the > committed > > > > > offsets, > > > > > > >> 2) > > > > > > >> > > > > follower brokers which do not have consumers directly > > > > > consuming > > > > > > >> from > > > > > > >> > it. > > > > > > >> > > > > 4. Is there any other changes need to be made (e.g. > new > > > > > > >> protocols) in > > > > > > >> > > > > addition to the configuration change? > > > > > > >> > > > > > > > > > > >> > > > > It would be great if you can update the wiki to have > > more > > > > > > details. > > > > > > >> > > > > > > > > > > >> > > > > Thanks, > > > > > > >> > > > > > > > > > > >> > > > > Jiangjie (Becket) Qin > > > > > > >> > > > > > > > > > > >> > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) < > > > > > > >> pengwei...@huawei.com> > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > Hi All, > > > > > > >> > > > > > I have made a KIP to enhance the log retention, > > > details > > > > > as > > > > > > >> > follows: > > > > > > >> > > > > > https://cwiki.apache.org/ > > confluence/display/KAFKA/KIP- > > > > > > >> > > > > > 68+Add+a+consumed+log+ > retention+before+log+retention > > > > > > >> > > > > > Now start a discuss thread for this KIP , looking > > > > forward > > > > > > to > > > > > > >> the > > > > > > >> > > > > > feedback. > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks, > > > > > > >> > > > > > David > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -Regards, > > > > Mayuresh R. Gharat > > > > (862) 250-7125 > > > > > > > > > > > > > > > -- > > -- Guozhang > > >