Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Guozhang, I do agree to make the broker simple, but the consumer retention process is not so complex though(working on the same thread of log retention thread, just add some querying offset procedure), and is disabled by default, I will update the patch as soon as possible you can see it. And for the method of all consumers agree on a specified point, this seems not easy to determined in the cloud environment(many consumer group have different consumer points), it need all the consumer to coordinate a specified point, it seems the min commit offset is the safest point to deleted. If implemented from the client app will also have the problems as the previous email mentioned. Thanks, David -- -- ??: "Guozhang Wang";; : 2016??11??17??(??) 3:21 ??: "dev@kafka.apache.org"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention David, I think it would be better implementing such synchronization (i.e. making sure all consumers has done fetching to that point, and no one will ever want to go back and re-consume) on the admin side, not on the broker side, since 1) we want to keep the broker system to be simple enough, and rather have a "layered architecture" to have such admin features on-top / by-side of the brokers rather built inside it, and 2) for some synchronization purposes like "making sure no on will ever want to go back and re-consume", brokers would not have any clues and it needs to be implemented from application to application anyways. What do you think? Guozhang On Sun, Nov 13, 2016 at 6:16 AM, <254479...@qq.com> wrote: > Hi Becket, > If using the trim.on.offset.commit parameter, it will help to quickly > trim the log, but other consumer group's consumer may find the messages are > trimmed. > We still need to coordinate many consumer groups to trim the log, it seems > difficult for the single consumer to do it. > Then it will still come to the problem: whether to implement in the > broker side or in the admin client side. Even implement in the broker > side, we can still using the > trim API to finish the log deletion for Leader or Replica segments. And > we can offer an option to safely delete the log(disable by default), so > this is motivation for this KIP. > > > Thanks, > David > > > > > > > > -- ???? -------------- > ??????: "Becket Qin";; > : 2016??11??6??(??) 11:39 > ??: "dev"; > > : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > I am thinking that depending on the use case, we may not need a separate > tool to have the committed message based retention using the trim() method. > One way to do this is to have a configuration like trim.on.offset.commit in > the consumer so after committing the offset, the consumer will also send a > trim request to the broker. > > In some cases, the application may want to trim the log in a more flexible > way, e.g not trim on commit but every hour. In that case, it is true that > users will need to trim the log with a separate admin client. However that > logic could be a long running stand-alone service independent of Kafka or > the application. It may have its own configurations as we discussed in this > KIP so the applications in that case would just talk to that service to > trim the log instead of taking to Kafka. > > Thanks, > > Jiangjie (Becket) Qin > > > On Sun, Nov 6, 2016 at 6:10 AM, <254479...@qq.com> wrote: > > > Hi Becket, > > The most important benefit of method (2) is we can safely delete the > > log segments, becasue all the deleted log segments are consumed. > > If the messages are very important, in this case we need to safely > delete > > the log segments instead of forcing delete it after the retention time. > > Kafka itself can insure all the deleted logs are consumed to improve > > End-to-End reliability. And this feature by default is disabled, so will > > stay simple for people not use it. > > Actually users can build a tool using the trimRequest to do this > > work(method 1), but users must start this tool with kafka all the time, > > this may not always holds. > > > > > > Thanks, > > David > > > > > > > > > > > > > > > > > > -- -- > > ??: "Becket Qin";; > > : 2016??11??1??(??) 3:57 > > ??: "dev"; > > > > : Re: [DISCUSS] KIP-68 Add a consumed log retention before log > retent
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
David, I think it would be better implementing such synchronization (i.e. making sure all consumers has done fetching to that point, and no one will ever want to go back and re-consume) on the admin side, not on the broker side, since 1) we want to keep the broker system to be simple enough, and rather have a "layered architecture" to have such admin features on-top / by-side of the brokers rather built inside it, and 2) for some synchronization purposes like "making sure no on will ever want to go back and re-consume", brokers would not have any clues and it needs to be implemented from application to application anyways. What do you think? Guozhang On Sun, Nov 13, 2016 at 6:16 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi Becket, > If using the trim.on.offset.commit parameter, it will help to quickly > trim the log, but other consumer group's consumer may find the messages are > trimmed. > We still need to coordinate many consumer groups to trim the log, it seems > difficult for the single consumer to do it. > Then it will still come to the problem: whether to implement in the > broker side or in the admin client side. Even implement in the broker > side, we can still using the > trim API to finish the log deletion for Leader or Replica segments. And > we can offer an option to safely delete the log(disable by default), so > this is motivation for this KIP. > > > Thanks, > David > > > > > > > > -- 原始邮件 -------------- > 发件人: "Becket Qin";; > 发送时间: 2016年11月6日(星期天) 晚上11:39 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > I am thinking that depending on the use case, we may not need a separate > tool to have the committed message based retention using the trim() method. > One way to do this is to have a configuration like trim.on.offset.commit in > the consumer so after committing the offset, the consumer will also send a > trim request to the broker. > > In some cases, the application may want to trim the log in a more flexible > way, e.g not trim on commit but every hour. In that case, it is true that > users will need to trim the log with a separate admin client. However that > logic could be a long running stand-alone service independent of Kafka or > the application. It may have its own configurations as we discussed in this > KIP so the applications in that case would just talk to that service to > trim the log instead of taking to Kafka. > > Thanks, > > Jiangjie (Becket) Qin > > > On Sun, Nov 6, 2016 at 6:10 AM, 东方甲乙 <254479...@qq.com> wrote: > > > Hi Becket, > > The most important benefit of method (2) is we can safely delete the > > log segments, becasue all the deleted log segments are consumed. > > If the messages are very important, in this case we need to safely > delete > > the log segments instead of forcing delete it after the retention time. > > Kafka itself can insure all the deleted logs are consumed to improve > > End-to-End reliability. And this feature by default is disabled, so will > > stay simple for people not use it. > > Actually users can build a tool using the trimRequest to do this > > work(method 1), but users must start this tool with kafka all the time, > > this may not always holds. > > > > > > Thanks, > > David > > > > > > > > > > > > > > > > > > -- 原始邮件 -- > > 发件人: "Becket Qin";; > > 发送时间: 2016年11月1日(星期二) 凌晨3:57 > > 收件人: "dev"; > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > retention > > > > > > > > Hi David, > > > > I think the trim() API is generally useful for the consume based > retention > > as well as other use cases. So we probably should have (1). > > > > For (2), it is more of an optimization by doing a favor for the users. > This > > could be implemented on top of (1) if we want to. So maybe we can > implement > > (1) first and let the applications do the trim() by themselves at this > > point. This will put more burden on the application side but is not that > > bad if there is only one downstream consumer group. In the future if we > > find more use cases where multiple down stream consumer groups need to > > coordinate among themselves and a broker side help would make things > > simpler, we can add (2) then. > > > > Regarding the relation between this KIP and KIP-47. At a high level, they > > are very similar, i.e. trim() by timestamp vs. trim() by offsets
Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Becket, If using the trim.on.offset.commit parameter, it will help to quickly trim the log, but other consumer group's consumer may find the messages are trimmed. We still need to coordinate many consumer groups to trim the log, it seems difficult for the single consumer to do it. Then it will still come to the problem: whether to implement in the broker side or in the admin client side. Even implement in the broker side, we can still using the trim API to finish the log deletion for Leader or Replica segments. And we can offer an option to safely delete the log(disable by default), so this is motivation for this KIP. Thanks, David -- -- ??: "Becket Qin";; : 2016??11??6??(??) 11:39 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, I am thinking that depending on the use case, we may not need a separate tool to have the committed message based retention using the trim() method. One way to do this is to have a configuration like trim.on.offset.commit in the consumer so after committing the offset, the consumer will also send a trim request to the broker. In some cases, the application may want to trim the log in a more flexible way, e.g not trim on commit but every hour. In that case, it is true that users will need to trim the log with a separate admin client. However that logic could be a long running stand-alone service independent of Kafka or the application. It may have its own configurations as we discussed in this KIP so the applications in that case would just talk to that service to trim the log instead of taking to Kafka. Thanks, Jiangjie (Becket) Qin On Sun, Nov 6, 2016 at 6:10 AM, <254479...@qq.com> wrote: > Hi Becket, > The most important benefit of method (2) is we can safely delete the > log segments, becasue all the deleted log segments are consumed. > If the messages are very important, in this case we need to safely delete > the log segments instead of forcing delete it after the retention time. > Kafka itself can insure all the deleted logs are consumed to improve > End-to-End reliability. And this feature by default is disabled, so will > stay simple for people not use it. > Actually users can build a tool using the trimRequest to do this > work(method 1), but users must start this tool with kafka all the time, > this may not always holds. > > > Thanks, > David > > > > > > > > > -- -- > ??????: "Becket Qin";; > ????: 2016??11??1??(??) 3:57 > ??: "dev"; > > : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > I think the trim() API is generally useful for the consume based retention > as well as other use cases. So we probably should have (1). > > For (2), it is more of an optimization by doing a favor for the users. This > could be implemented on top of (1) if we want to. So maybe we can implement > (1) first and let the applications do the trim() by themselves at this > point. This will put more burden on the application side but is not that > bad if there is only one downstream consumer group. In the future if we > find more use cases where multiple down stream consumer groups need to > coordinate among themselves and a broker side help would make things > simpler, we can add (2) then. > > Regarding the relation between this KIP and KIP-47. At a high level, they > are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would > be worth thinking about them together. After KIP-79, we can search messages > by timestamp, this essentially translates the timestamp to offsets. So > KIP-47 can also be built on top of the trim() by offsets interface after > translating the timestamp to offsets. Jun has suggested an implementation > in KIP-47 discussion thread which introduces a new TrimRequest. Would you > take a look and see if that could be used for KIP-68 as well? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Sun, Oct 30, 2016 at 2:24 AM, <254479...@qq.com> wrote: > > > Hi All, > > > > > > As per our discussion, there are two ways to clean the consumed log: > > > > > > 1) Use an Admin Tool to find the min commit offset for some topics of the > > specified set of consumer groups, then send the trim API to all the > > replicas of the brokers, > > then the brokers will start to trim the log segments of these topics. > > > > > > The benefit of this method is to keep the broker simple and more flexible > > for the users, but it is more complicated for the users to clea
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, I am thinking that depending on the use case, we may not need a separate tool to have the committed message based retention using the trim() method. One way to do this is to have a configuration like trim.on.offset.commit in the consumer so after committing the offset, the consumer will also send a trim request to the broker. In some cases, the application may want to trim the log in a more flexible way, e.g not trim on commit but every hour. In that case, it is true that users will need to trim the log with a separate admin client. However that logic could be a long running stand-alone service independent of Kafka or the application. It may have its own configurations as we discussed in this KIP so the applications in that case would just talk to that service to trim the log instead of taking to Kafka. Thanks, Jiangjie (Becket) Qin On Sun, Nov 6, 2016 at 6:10 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi Becket, > The most important benefit of method (2) is we can safely delete the > log segments, becasue all the deleted log segments are consumed. > If the messages are very important, in this case we need to safely delete > the log segments instead of forcing delete it after the retention time. > Kafka itself can insure all the deleted logs are consumed to improve > End-to-End reliability. And this feature by default is disabled, so will > stay simple for people not use it. > Actually users can build a tool using the trimRequest to do this > work(method 1), but users must start this tool with kafka all the time, > this may not always holds. > > > Thanks, > David > > > > > > > > > -- 原始邮件 -- > 发件人: "Becket Qin";; > 发送时间: 2016年11月1日(星期二) 凌晨3:57 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > I think the trim() API is generally useful for the consume based retention > as well as other use cases. So we probably should have (1). > > For (2), it is more of an optimization by doing a favor for the users. This > could be implemented on top of (1) if we want to. So maybe we can implement > (1) first and let the applications do the trim() by themselves at this > point. This will put more burden on the application side but is not that > bad if there is only one downstream consumer group. In the future if we > find more use cases where multiple down stream consumer groups need to > coordinate among themselves and a broker side help would make things > simpler, we can add (2) then. > > Regarding the relation between this KIP and KIP-47. At a high level, they > are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would > be worth thinking about them together. After KIP-79, we can search messages > by timestamp, this essentially translates the timestamp to offsets. So > KIP-47 can also be built on top of the trim() by offsets interface after > translating the timestamp to offsets. Jun has suggested an implementation > in KIP-47 discussion thread which introduces a new TrimRequest. Would you > take a look and see if that could be used for KIP-68 as well? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Sun, Oct 30, 2016 at 2:24 AM, 东方甲乙 <254479...@qq.com> wrote: > > > Hi All, > > > > > > As per our discussion, there are two ways to clean the consumed log: > > > > > > 1) Use an Admin Tool to find the min commit offset for some topics of the > > specified set of consumer groups, then send the trim API to all the > > replicas of the brokers, > > then the brokers will start to trim the log segments of these topics. > > > > > > The benefit of this method is to keep the broker simple and more flexible > > for the users, but it is more complicated for the users to clean all the > > messages which are consumed. > > > > > > 2) Broker will periodically do the consumed log retention as the KIP > > mentioned. This method is simple for the users and it can automatically > > clean the consumed log, but it will add more query work to the brokers. > > > > > > Which method is better? > > > > > > Thanks, > > David > > > > > > > > > > > > > > > > > > -- 原始邮件 -- > > 发件人: "Mayuresh Gharat";; > > 发送时间: 2016年10月29日(星期六) 凌晨1:43 > > 收件人: "dev"; > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > retention > > > > > > > > I do agree with Guozhang on having applications request an external > > service(admin) that talks to kafka, for trimming, in
?????? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Becket, The most important benefit of method (2) is we can safely delete the log segments, becasue all the deleted log segments are consumed. If the messages are very important, in this case we need to safely delete the log segments instead of forcing delete it after the retention time. Kafka itself can insure all the deleted logs are consumed to improve End-to-End reliability. And this feature by default is disabled, so will stay simple for people not use it. Actually users can build a tool using the trimRequest to do this work(method 1), but users must start this tool with kafka all the time, this may not always holds. Thanks, David -- -- ??: "Becket Qin";; : 2016??11??1??(??) 3:57 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, I think the trim() API is generally useful for the consume based retention as well as other use cases. So we probably should have (1). For (2), it is more of an optimization by doing a favor for the users. This could be implemented on top of (1) if we want to. So maybe we can implement (1) first and let the applications do the trim() by themselves at this point. This will put more burden on the application side but is not that bad if there is only one downstream consumer group. In the future if we find more use cases where multiple down stream consumer groups need to coordinate among themselves and a broker side help would make things simpler, we can add (2) then. Regarding the relation between this KIP and KIP-47. At a high level, they are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would be worth thinking about them together. After KIP-79, we can search messages by timestamp, this essentially translates the timestamp to offsets. So KIP-47 can also be built on top of the trim() by offsets interface after translating the timestamp to offsets. Jun has suggested an implementation in KIP-47 discussion thread which introduces a new TrimRequest. Would you take a look and see if that could be used for KIP-68 as well? Thanks, Jiangjie (Becket) Qin On Sun, Oct 30, 2016 at 2:24 AM, <254479...@qq.com> wrote: > Hi All, > > > As per our discussion, there are two ways to clean the consumed log: > > > 1) Use an Admin Tool to find the min commit offset for some topics of the > specified set of consumer groups, then send the trim API to all the > replicas of the brokers, > then the brokers will start to trim the log segments of these topics. > > > The benefit of this method is to keep the broker simple and more flexible > for the users, but it is more complicated for the users to clean all the > messages which are consumed. > > > 2) Broker will periodically do the consumed log retention as the KIP > mentioned. This method is simple for the users and it can automatically > clean the consumed log, but it will add more query work to the brokers. > > > Which method is better? > > > Thanks, > David > > > > > > > > > -- ???? -------------- > ??: "Mayuresh Gharat";; > : 2016??10??29??(??) 1:43 > ??: "dev"; > > : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > I do agree with Guozhang on having applications request an external > service(admin) that talks to kafka, for trimming, in which case this > external service(admin) can check if its OK to send the trim request to > kafka brokers based on a certain conditions. > On broker side we can have authorization by way of ACLs may be, saying that > only this external admin service is allowed to call trim(). In this way we > can actually move the main decision making process out of core. > > Thanks, > > Mayuresh > > On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang > wrote: > > > Yes trim() should be an admin API and, if security is concerned, it > should > > be under admin authorization as well. > > > > For applications that needs this feature, it then boils down to the > problem > > that they should request the authorization token from who operates Kafka > > before starting their app to use in their own client, which I think is a > > feasible requirement. > > > > > > Guozhang > > > > > > On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat < > > gharatmayures...@gmail.com > > > wrote: > > > > > Hi Guozhang, > > > > > > I agree that pushing out the complexity of coordination to the client > > > application makes it more simple for the broker in the sense that it > does > > > not have to be the decision maker regarding when
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, I think the trim() API is generally useful for the consume based retention as well as other use cases. So we probably should have (1). For (2), it is more of an optimization by doing a favor for the users. This could be implemented on top of (1) if we want to. So maybe we can implement (1) first and let the applications do the trim() by themselves at this point. This will put more burden on the application side but is not that bad if there is only one downstream consumer group. In the future if we find more use cases where multiple down stream consumer groups need to coordinate among themselves and a broker side help would make things simpler, we can add (2) then. Regarding the relation between this KIP and KIP-47. At a high level, they are very similar, i.e. trim() by timestamp vs. trim() by offsets. It would be worth thinking about them together. After KIP-79, we can search messages by timestamp, this essentially translates the timestamp to offsets. So KIP-47 can also be built on top of the trim() by offsets interface after translating the timestamp to offsets. Jun has suggested an implementation in KIP-47 discussion thread which introduces a new TrimRequest. Would you take a look and see if that could be used for KIP-68 as well? Thanks, Jiangjie (Becket) Qin On Sun, Oct 30, 2016 at 2:24 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi All, > > > As per our discussion, there are two ways to clean the consumed log: > > > 1) Use an Admin Tool to find the min commit offset for some topics of the > specified set of consumer groups, then send the trim API to all the > replicas of the brokers, > then the brokers will start to trim the log segments of these topics. > > > The benefit of this method is to keep the broker simple and more flexible > for the users, but it is more complicated for the users to clean all the > messages which are consumed. > > > 2) Broker will periodically do the consumed log retention as the KIP > mentioned. This method is simple for the users and it can automatically > clean the consumed log, but it will add more query work to the brokers. > > > Which method is better? > > > Thanks, > David > > > > > > > > > ------ 原始邮件 -------------- > 发件人: "Mayuresh Gharat";; > 发送时间: 2016年10月29日(星期六) 凌晨1:43 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > I do agree with Guozhang on having applications request an external > service(admin) that talks to kafka, for trimming, in which case this > external service(admin) can check if its OK to send the trim request to > kafka brokers based on a certain conditions. > On broker side we can have authorization by way of ACLs may be, saying that > only this external admin service is allowed to call trim(). In this way we > can actually move the main decision making process out of core. > > Thanks, > > Mayuresh > > On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang > wrote: > > > Yes trim() should be an admin API and, if security is concerned, it > should > > be under admin authorization as well. > > > > For applications that needs this feature, it then boils down to the > problem > > that they should request the authorization token from who operates Kafka > > before starting their app to use in their own client, which I think is a > > feasible requirement. > > > > > > Guozhang > > > > > > On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat < > > gharatmayures...@gmail.com > > > wrote: > > > > > Hi Guozhang, > > > > > > I agree that pushing out the complexity of coordination to the client > > > application makes it more simple for the broker in the sense that it > does > > > not have to be the decision maker regarding when to trim and till what > > > offset. An I agree that if we go in this direction, providing an offset > > > parameter makes sense. > > > > > > > > > But since the main motivation for this seems like saving or reclaiming > > the > > > disk space on broker side, I am not 100% sure how good it is to rely on > > the > > > client application to be a good citizen and call the trim API. > > > Also I see the trim() api as more of an admin api rather than client > API. > > > > > > > > > Thanks, > > > > > > Mayuresh > > > > > > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang > > wrote: > > > > > > > Here are my thoughts: > > > > > > > > If there are indeed multiple consumer groups on the same topic that > >
Re?? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi All, As per our discussion, there are two ways to clean the consumed log: 1) Use an Admin Tool to find the min commit offset for some topics of the specified set of consumer groups, then send the trim API to all the replicas of the brokers, then the brokers will start to trim the log segments of these topics. The benefit of this method is to keep the broker simple and more flexible for the users, but it is more complicated for the users to clean all the messages which are consumed. 2) Broker will periodically do the consumed log retention as the KIP mentioned. This method is simple for the users and it can automatically clean the consumed log, but it will add more query work to the brokers. Which method is better? Thanks, David -- -- ??: "Mayuresh Gharat";; : 2016??10??29??(??) 1:43 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention I do agree with Guozhang on having applications request an external service(admin) that talks to kafka, for trimming, in which case this external service(admin) can check if its OK to send the trim request to kafka brokers based on a certain conditions. On broker side we can have authorization by way of ACLs may be, saying that only this external admin service is allowed to call trim(). In this way we can actually move the main decision making process out of core. Thanks, Mayuresh On Fri, Oct 28, 2016 at 10:33 AM, Guozhang Wang wrote: > Yes trim() should be an admin API and, if security is concerned, it should > be under admin authorization as well. > > For applications that needs this feature, it then boils down to the problem > that they should request the authorization token from who operates Kafka > before starting their app to use in their own client, which I think is a > feasible requirement. > > > Guozhang > > > On Fri, Oct 28, 2016 at 9:42 AM, Mayuresh Gharat < > gharatmayures...@gmail.com > > wrote: > > > Hi Guozhang, > > > > I agree that pushing out the complexity of coordination to the client > > application makes it more simple for the broker in the sense that it does > > not have to be the decision maker regarding when to trim and till what > > offset. An I agree that if we go in this direction, providing an offset > > parameter makes sense. > > > > > > But since the main motivation for this seems like saving or reclaiming > the > > disk space on broker side, I am not 100% sure how good it is to rely on > the > > client application to be a good citizen and call the trim API. > > Also I see the trim() api as more of an admin api rather than client API. > > > > > > Thanks, > > > > Mayuresh > > > > On Fri, Oct 28, 2016 at 7:12 AM, Guozhang Wang > wrote: > > > > > Here are my thoughts: > > > > > > If there are indeed multiple consumer groups on the same topic that > needs > > > to coordinate, it is equally complex if the coordination is on the > broker > > > or among the applications themselves: for the latter case, you would > > > imagine some coordination services used (like ZK) to register groups > for > > > that topic and let these groups agree upon the minimum offset that is > > safe > > > to trim for all of them; for the former case, we just need to move this > > > coordination service into the broker side, which to me is not a good > > design > > > under the principle of making broker simple. > > > > > > And as we discussed, there are scenarios where the offset to trim is > not > > > necessarily dependent on the committed offsets, even if the topic is > only > > > consumed by a single consumer group and we do not need any > coordination. > > So > > > I think it is appropriate to require an "offset parameter" in the trim > > API. > > > > > > Guozhang > > > > > > > > > > > > > > > On Fri, Oct 28, 2016 at 1:27 AM, Becket Qin > > wrote: > > > > > > > Hey Guozhang, > > > > > > > > I think the trim() interface is generally useful. What I was > wondering > > is > > > > the following: > > > > if the user has multiple applications to coordinate, it seems simpler > > for > > > > the broker to coordinate instead of asking the applications to > > coordinate > > > > among themselves. If we let the broker do the coordination and do not > > > want > > > > to reuse committed offset for trim(), we kind of need something like > > > > &qu
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
> > > > > > features. > > > > > > > > > > > > > > > > > > 2. In this case, my question is then whether this > bookkeeping > > > of > > > > > > > > > min-committed-offsets should be done at the brokers side or > > it > > > > > should > > > > > > > be > > > > > > > > on > > > > > > > > > the app side. My gut feeling is that it could be better > > > bookkept > > > > on > > > > > > the > > > > > > > > app > > > > > > > > > (i.e. client) side which has the full information of the > > > > > "registered > > > > > > > > > consumer groups" for certain topics, and then knows the > > > > > > > > > min-committed-offsets. And a slightly-modified KIP-47 > > mentioned > > > > by > > > > > > Dong > > > > > > > > > could a better fit, where a) app side bookkeep the > > > > consumer-driven > > > > > > min > > > > > > > > > offset based on their committed offsets, by either talking > to > > > the > > > > > > > > consumer > > > > > > > > > clients directly or query broker for the committed offsets > of > > > > those > > > > > > > > > registered consumer groups, and then b) write > > > > > > > > > *log.retention.min.offset* periodically > > > > > > > > > to broker to let it delete old segments before that offset > > > (NOTE > > > > > that > > > > > > > the > > > > > > > > > semantics is exactly the same as to KIP-47, while the only > > > > > difference > > > > > > > is > > > > > > > > > that we use offset instead of timestamp to indicate, which > > can > > > be > > > > > > honor > > > > > > > > by > > > > > > > > > the same implementation of KIP-47 on broker side). > > > > > > > > > > > > > > > > > > My arguments for letting the app side to bookkeep such > > > > min-offsets > > > > > > and > > > > > > > > only > > > > > > > > > let brokers to take requests to delete segments accordingly > > are > > > > 1) > > > > > > > > keeping > > > > > > > > > the broker simple without any querying each other about > such > > > > > offsets > > > > > > > and > > > > > > > > > does the min() calculation, rather only keeping / deleting > > > > messages > > > > > > > from > > > > > > > > > client admin requests, and 2) allowing more generalized > > > > > client-driven > > > > > > > log > > > > > > > > > retention policies with KIP-47 (i.e. broker is brainless > and > > > only > > > > > > take > > > > > > > > > requests while client-app can apply any customized logic to > > > > > determine > > > > > > > the > > > > > > > > > config values of *og.retention.min.offset or > > > > > > > > **og.retention.min.timestamp* > > > > > > > > > that > > > > > > > > > they send to the brokers). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin < > > > > becket@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > > > > > > > > > > > > > One scenario is stream processing pipeline. In a stream > > > > > processing > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
tation of KIP-47 on broker side). > > > > > > > > > > > > > > > > My arguments for letting the app side to bookkeep such > > > min-offsets > > > > > and > > > > > > > only > > > > > > > > let brokers to take requests to delete segments accordingly > are > > > 1) > > > > > > > keeping > > > > > > > > the broker simple without any querying each other about such > > > > offsets > > > > > > and > > > > > > > > does the min() calculation, rather only keeping / deleting > > > messages > > > > > > from > > > > > > > > client admin requests, and 2) allowing more generalized > > > > client-driven > > > > > > log > > > > > > > > retention policies with KIP-47 (i.e. broker is brainless and > > only > > > > > take > > > > > > > > requests while client-app can apply any customized logic to > > > > determine > > > > > > the > > > > > > > > config values of *og.retention.min.offset or > > > > > > > **og.retention.min.timestamp* > > > > > > > > that > > > > > > > > they send to the brokers). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin < > > > becket@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > > > > > > > > > > > One scenario is stream processing pipeline. In a stream > > > > processing > > > > > > DAG, > > > > > > > > > there will be a bunch of intermediate result, we only care > > > about > > > > > the > > > > > > > > > consumer group that is in the downstream of the DAG, but > not > > > > other > > > > > > > > groups. > > > > > > > > > Ideally we want to delete the log of the intermediate > topics > > > > right > > > > > > > after > > > > > > > > > all the downstream processing jobs has successfully > processed > > > the > > > > > > > > messages. > > > > > > > > > In that case, we only care about the downstream processing > > > jobs, > > > > > but > > > > > > > not > > > > > > > > > other groups. That means if a down stream job did not > commit > > > > offset > > > > > > for > > > > > > > > > some reason, we want to wait for that job. Without the > > > predefined > > > > > > > > > interested group, it is hard to achieve this. > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. Yes, the configuration should be at topic level and set > > > > > > dynamically. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Mayuresh, > > > > > > > > > > Thanks for the reply: > > > > > > > > > > 1. In the log retention check schedule, the broker first > > > find > > > > > the > > > > > > > all > > > > > > > > > the > > > > > > > > > > consumed group which are consuming this topic, and query > > the > > > > > commit > > > > > > > > > offset > > > > > > > > > > of this consumed group for the topic > > &g
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
pment, > > > > etc) > > > > > > can > > > > > > > come and go dynamically, in which case it is hard to track all > of > > > > such > > > > > > > consumer and maintain the minimum committed offsets; on the > other > > > > hand, > > > > > > > there are another category of topics (think: stream-app owned > > > > > > intermediate > > > > > > > topics like "pricing-enriched-bid-activity", as Becket > mentioned > > > > > above) > > > > > > > which are particularly own but only one or a few apps, and > hence > > > the > > > > > > > consumer groups for those topics are pre-defined and roughly > > > static. > > > > In > > > > > > > this case I think it makes sense to allow such consumer-drive > log > > > > > > retention > > > > > > > features. > > > > > > > > > > > > > > 2. In this case, my question is then whether this bookkeeping > of > > > > > > > min-committed-offsets should be done at the brokers side or it > > > should > > > > > be > > > > > > on > > > > > > > the app side. My gut feeling is that it could be better > bookkept > > on > > > > the > > > > > > app > > > > > > > (i.e. client) side which has the full information of the > > > "registered > > > > > > > consumer groups" for certain topics, and then knows the > > > > > > > min-committed-offsets. And a slightly-modified KIP-47 mentioned > > by > > > > Dong > > > > > > > could a better fit, where a) app side bookkeep the > > consumer-driven > > > > min > > > > > > > offset based on their committed offsets, by either talking to > the > > > > > > consumer > > > > > > > clients directly or query broker for the committed offsets of > > those > > > > > > > registered consumer groups, and then b) write > > > > > > > *log.retention.min.offset* periodically > > > > > > > to broker to let it delete old segments before that offset > (NOTE > > > that > > > > > the > > > > > > > semantics is exactly the same as to KIP-47, while the only > > > difference > > > > > is > > > > > > > that we use offset instead of timestamp to indicate, which can > be > > > > honor > > > > > > by > > > > > > > the same implementation of KIP-47 on broker side). > > > > > > > > > > > > > > My arguments for letting the app side to bookkeep such > > min-offsets > > > > and > > > > > > only > > > > > > > let brokers to take requests to delete segments accordingly are > > 1) > > > > > > keeping > > > > > > > the broker simple without any querying each other about such > > > offsets > > > > > and > > > > > > > does the min() calculation, rather only keeping / deleting > > messages > > > > > from > > > > > > > client admin requests, and 2) allowing more generalized > > > client-driven > > > > > log > > > > > > > retention policies with KIP-47 (i.e. broker is brainless and > only > > > > take > > > > > > > requests while client-app can apply any customized logic to > > > determine > > > > > the > > > > > > > config values of *og.retention.min.offset or > > > > > > **og.retention.min.timestamp* > > > > > > > that > > > > > > > they send to the brokers). > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin < > > becket@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > > > > &
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
IP-47 mentioned > by > > > Dong > > > > > > could a better fit, where a) app side bookkeep the > consumer-driven > > > min > > > > > > offset based on their committed offsets, by either talking to the > > > > > consumer > > > > > > clients directly or query broker for the committed offsets of > those > > > > > > registered consumer groups, and then b) write > > > > > > *log.retention.min.offset* periodically > > > > > > to broker to let it delete old segments before that offset (NOTE > > that > > > > the > > > > > > semantics is exactly the same as to KIP-47, while the only > > difference > > > > is > > > > > > that we use offset instead of timestamp to indicate, which can be > > > honor > > > > > by > > > > > > the same implementation of KIP-47 on broker side). > > > > > > > > > > > > My arguments for letting the app side to bookkeep such > min-offsets > > > and > > > > > only > > > > > > let brokers to take requests to delete segments accordingly are > 1) > > > > > keeping > > > > > > the broker simple without any querying each other about such > > offsets > > > > and > > > > > > does the min() calculation, rather only keeping / deleting > messages > > > > from > > > > > > client admin requests, and 2) allowing more generalized > > client-driven > > > > log > > > > > > retention policies with KIP-47 (i.e. broker is brainless and only > > > take > > > > > > requests while client-app can apply any customized logic to > > determine > > > > the > > > > > > config values of *og.retention.min.offset or > > > > > **og.retention.min.timestamp* > > > > > > that > > > > > > they send to the brokers). > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin < > becket@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > > > > > > > One scenario is stream processing pipeline. In a stream > > processing > > > > DAG, > > > > > > > there will be a bunch of intermediate result, we only care > about > > > the > > > > > > > consumer group that is in the downstream of the DAG, but not > > other > > > > > > groups. > > > > > > > Ideally we want to delete the log of the intermediate topics > > right > > > > > after > > > > > > > all the downstream processing jobs has successfully processed > the > > > > > > messages. > > > > > > > In that case, we only care about the downstream processing > jobs, > > > but > > > > > not > > > > > > > other groups. That means if a down stream job did not commit > > offset > > > > for > > > > > > > some reason, we want to wait for that job. Without the > predefined > > > > > > > interested group, it is hard to achieve this. > > > > > > > > > > > > > > > > > > > > > 2. Yes, the configuration should be at topic level and set > > > > dynamically. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> > wrote: > > > > > > > > > > > > > > > Hi Mayuresh, > > > > > > > > Thanks for the reply: > > > > > > > > 1. In the log retention check schedule, the broker first > find > > > the > > > > > all > > > > > > > the > > > > > > > > consumed group which are consuming this topic, and query the > > > commit > > > > > > &
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
amp* > > > > > that > > > > > they send to the brokers). > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin > > > > wrote: > > > > > > > > > > > Hi David, > > > > > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > > > > > One scenario is stream processing pipeline. In a stream > processing > > > DAG, > > > > > > there will be a bunch of intermediate result, we only care about > > the > > > > > > consumer group that is in the downstream of the DAG, but not > other > > > > > groups. > > > > > > Ideally we want to delete the log of the intermediate topics > right > > > > after > > > > > > all the downstream processing jobs has successfully processed the > > > > > messages. > > > > > > In that case, we only care about the downstream processing jobs, > > but > > > > not > > > > > > other groups. That means if a down stream job did not commit > offset > > > for > > > > > > some reason, we want to wait for that job. Without the predefined > > > > > > interested group, it is hard to achieve this. > > > > > > > > > > > > > > > > > > 2. Yes, the configuration should be at topic level and set > > > dynamically. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > > > > > > > > > > > > > Hi Mayuresh, > > > > > > > Thanks for the reply: > > > > > > > 1. In the log retention check schedule, the broker first find > > the > > > > all > > > > > > the > > > > > > > consumed group which are consuming this topic, and query the > > commit > > > > > > offset > > > > > > > of this consumed group for the topic > > > > > > > using the OffsetFetch API. And the min commit offset is the > > minimal > > > > > > commit > > > > > > > offset between these commit offsets. > > > > > > > > > > > > > > > > > > > > > 2. If the console consumer reading and commit, its commit > offset > > > > will > > > > > be > > > > > > > used to calculate the min commit offset for this topic. > > > > > > > We can avoid the random consumer using the method Becket > > suggested. > > > > > > > > > > > > > > > > > > > > > 3. It will not delete the log immediately, the log will stay > some > > > > time > > > > > ( > > > > > > > retention.commitoffset.ms), and after that we only delete > > > > > > > the log segments whose offsets are less than the min commit > > offset. > > > > So > > > > > > > the user can rewind its offset in the log.retention.ms. > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- 原始邮件 -- > > > > > > > 发件人: "Mayuresh Gharat";; > > > > > > > 发送时间: 2016年10月19日(星期三) 上午10:25 > > > > > > > 收件人: "dev"; > > > > > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before > log > > > > > > retention > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > > > I had some questions/suggestions : > > > > > > > > > > > > > > It would be great if you can explain with an example ab
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
emporary > > > > consumers (for debugging, trouble shooting, prototype development, > etc) > > > can > > > > come and go dynamically, in which case it is hard to track all of > such > > > > consumer and maintain the minimum committed offsets; on the other > hand, > > > > there are another category of topics (think: stream-app owned > > > intermediate > > > > topics like "pricing-enriched-bid-activity", as Becket mentioned > > above) > > > > which are particularly own but only one or a few apps, and hence the > > > > consumer groups for those topics are pre-defined and roughly static. > In > > > > this case I think it makes sense to allow such consumer-drive log > > > retention > > > > features. > > > > > > > > 2. In this case, my question is then whether this bookkeeping of > > > > min-committed-offsets should be done at the brokers side or it should > > be > > > on > > > > the app side. My gut feeling is that it could be better bookkept on > the > > > app > > > > (i.e. client) side which has the full information of the "registered > > > > consumer groups" for certain topics, and then knows the > > > > min-committed-offsets. And a slightly-modified KIP-47 mentioned by > Dong > > > > could a better fit, where a) app side bookkeep the consumer-driven > min > > > > offset based on their committed offsets, by either talking to the > > > consumer > > > > clients directly or query broker for the committed offsets of those > > > > registered consumer groups, and then b) write > > > > *log.retention.min.offset* periodically > > > > to broker to let it delete old segments before that offset (NOTE that > > the > > > > semantics is exactly the same as to KIP-47, while the only difference > > is > > > > that we use offset instead of timestamp to indicate, which can be > honor > > > by > > > > the same implementation of KIP-47 on broker side). > > > > > > > > My arguments for letting the app side to bookkeep such min-offsets > and > > > only > > > > let brokers to take requests to delete segments accordingly are 1) > > > keeping > > > > the broker simple without any querying each other about such offsets > > and > > > > does the min() calculation, rather only keeping / deleting messages > > from > > > > client admin requests, and 2) allowing more generalized client-driven > > log > > > > retention policies with KIP-47 (i.e. broker is brainless and only > take > > > > requests while client-app can apply any customized logic to determine > > the > > > > config values of *og.retention.min.offset or > > > **og.retention.min.timestamp* > > > > that > > > > they send to the brokers). > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin > > > wrote: > > > > > > > > > Hi David, > > > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > > > One scenario is stream processing pipeline. In a stream processing > > DAG, > > > > > there will be a bunch of intermediate result, we only care about > the > > > > > consumer group that is in the downstream of the DAG, but not other > > > > groups. > > > > > Ideally we want to delete the log of the intermediate topics right > > > after > > > > > all the downstream processing jobs has successfully processed the > > > > messages. > > > > > In that case, we only care about the downstream processing jobs, > but > > > not > > > > > other groups. That means if a down stream job did not commit offset > > for > > > > > some reason, we want to wait for that job. Without the predefined > > > > > interested group, it is hard to achieve this. > > > > > > > > > > > > > > > 2. Yes, the configuration should be at topic level and set > > dynamically. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
t; *log.retention.min.offset* periodically > > > to broker to let it delete old segments before that offset (NOTE that > the > > > semantics is exactly the same as to KIP-47, while the only difference > is > > > that we use offset instead of timestamp to indicate, which can be honor > > by > > > the same implementation of KIP-47 on broker side). > > > > > > My arguments for letting the app side to bookkeep such min-offsets and > > only > > > let brokers to take requests to delete segments accordingly are 1) > > keeping > > > the broker simple without any querying each other about such offsets > and > > > does the min() calculation, rather only keeping / deleting messages > from > > > client admin requests, and 2) allowing more generalized client-driven > log > > > retention policies with KIP-47 (i.e. broker is brainless and only take > > > requests while client-app can apply any customized logic to determine > the > > > config values of *og.retention.min.offset or > > **og.retention.min.timestamp* > > > that > > > they send to the brokers). > > > > > > > > > > > > Guozhang > > > > > > > > > On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin > > wrote: > > > > > > > Hi David, > > > > > > > > > 1. What scenario is used to this configuration? > > > > > > > > One scenario is stream processing pipeline. In a stream processing > DAG, > > > > there will be a bunch of intermediate result, we only care about the > > > > consumer group that is in the downstream of the DAG, but not other > > > groups. > > > > Ideally we want to delete the log of the intermediate topics right > > after > > > > all the downstream processing jobs has successfully processed the > > > messages. > > > > In that case, we only care about the downstream processing jobs, but > > not > > > > other groups. That means if a down stream job did not commit offset > for > > > > some reason, we want to wait for that job. Without the predefined > > > > interested group, it is hard to achieve this. > > > > > > > > > > > > 2. Yes, the configuration should be at topic level and set > dynamically. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > > > > > > > > > Hi Mayuresh, > > > > > Thanks for the reply: > > > > > 1. In the log retention check schedule, the broker first find the > > all > > > > the > > > > > consumed group which are consuming this topic, and query the commit > > > > offset > > > > > of this consumed group for the topic > > > > > using the OffsetFetch API. And the min commit offset is the minimal > > > > commit > > > > > offset between these commit offsets. > > > > > > > > > > > > > > > 2. If the console consumer reading and commit, its commit offset > > will > > > be > > > > > used to calculate the min commit offset for this topic. > > > > > We can avoid the random consumer using the method Becket suggested. > > > > > > > > > > > > > > > 3. It will not delete the log immediately, the log will stay some > > time > > > ( > > > > > retention.commitoffset.ms), and after that we only delete > > > > > the log segments whose offsets are less than the min commit offset. > > So > > > > > the user can rewind its offset in the log.retention.ms. > > > > > > > > > > > > > > > Thanks, > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > > -- 原始邮件 -- > > > > > 发件人: "Mayuresh Gharat";; > > > > > 发送时间: 2016年10月19日(星期三) 上午10:25 > > > > > 收件人: "dev"; > > > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > > > > retention > > > > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > Thanks for the KIP. > > > > > > > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
; > > > > Jiangjie (Becket) Qin > > > > > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > > > > > > > Hi Mayuresh, > > > > Thanks for the reply: > > > > 1. In the log retention check schedule, the broker first find the > all > > > the > > > > consumed group which are consuming this topic, and query the commit > > > offset > > > > of this consumed group for the topic > > > > using the OffsetFetch API. And the min commit offset is the minimal > > > commit > > > > offset between these commit offsets. > > > > > > > > > > > > 2. If the console consumer reading and commit, its commit offset > will > > be > > > > used to calculate the min commit offset for this topic. > > > > We can avoid the random consumer using the method Becket suggested. > > > > > > > > > > > > 3. It will not delete the log immediately, the log will stay some > time > > ( > > > > retention.commitoffset.ms), and after that we only delete > > > > the log segments whose offsets are less than the min commit offset. > So > > > > the user can rewind its offset in the log.retention.ms. > > > > > > > > > > > > Thanks, > > > > David > > > > > > > > > > > > > > > > > > > > -- 原始邮件 -- > > > > 发件人: "Mayuresh Gharat";; > > > > 发送时间: 2016年10月19日(星期三) 上午10:25 > > > > 收件人: "dev"; > > > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > > > retention > > > > > > > > > > > > > > > > Hi David, > > > > > > > > Thanks for the KIP. > > > > > > > > I had some questions/suggestions : > > > > > > > > It would be great if you can explain with an example about how the > min > > > > offset for all the consumers will be calculated, in the KIP. > > > > What I meant was, it would be great to understand with a pseudo > > > > code/workflow if possible, how each broker knows all the consumers > for > > > the > > > > given topic-partition and how the min is calculated. > > > > > > > > Also it would be good to understand what happens if we start a > console > > > > consumer which would actually start reading from the beginning offset > > and > > > > commit and crash immediately. How will the segments get deleted? > > > > > > > > Will it delete all the log segments if all the consumers have read > till > > > > latest? If Yes, would we be able to handle a scenario were we say > that > > > user > > > > can rewind its offset to reprocess the data since log.retention.ms > > might > > > > not has reached. > > > > > > > > Thanks, > > > > > > > > Mayuresh > > > > > > > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin > > > wrote: > > > > > > > > > Hey David, > > > > > > > > > > Thanks for replies to the questions. > > > > > > > > > > I think one major thing still not clear at this point is that > whether > > > the > > > > > brokers will only apply the consumed log retention to a specific > set > > of > > > > > interested consumer groups, or it does not have such a set of > > consumer > > > > > groups. > > > > > > > > > > For example, for topic T, assume we know that there will be two > > > > downstream > > > > > consumer groups CG1 and CG2 consuming data from topic T. Will we > add > > a > > > > > topic configurations such as > > > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T > so > > > > that > > > > > the brokers only care about CG1 and CG2. The committed offsets of > > other > > > > > groups are not interested and won't have any impact on the > committed > > > > offset > > > > > based log retention. > > > > > > > > > > It seems the current proposal does not have an "interested consumer > > > group > > > > > set" configuration, so that means any random consumer group may >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
n these commit offsets. > > > > > > > > > 2. If the console consumer reading and commit, its commit offset will > be > > > used to calculate the min commit offset for this topic. > > > We can avoid the random consumer using the method Becket suggested. > > > > > > > > > 3. It will not delete the log immediately, the log will stay some time > ( > > > retention.commitoffset.ms), and after that we only delete > > > the log segments whose offsets are less than the min commit offset. So > > > the user can rewind its offset in the log.retention.ms. > > > > > > > > > Thanks, > > > David > > > > > > > > > > > > > > > -- 原始邮件 -- > > > 发件人: "Mayuresh Gharat";; > > > 发送时间: 2016年10月19日(星期三) 上午10:25 > > > 收件人: "dev"; > > > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > > retention > > > > > > > > > > > > Hi David, > > > > > > Thanks for the KIP. > > > > > > I had some questions/suggestions : > > > > > > It would be great if you can explain with an example about how the min > > > offset for all the consumers will be calculated, in the KIP. > > > What I meant was, it would be great to understand with a pseudo > > > code/workflow if possible, how each broker knows all the consumers for > > the > > > given topic-partition and how the min is calculated. > > > > > > Also it would be good to understand what happens if we start a console > > > consumer which would actually start reading from the beginning offset > and > > > commit and crash immediately. How will the segments get deleted? > > > > > > Will it delete all the log segments if all the consumers have read till > > > latest? If Yes, would we be able to handle a scenario were we say that > > user > > > can rewind its offset to reprocess the data since log.retention.ms > might > > > not has reached. > > > > > > Thanks, > > > > > > Mayuresh > > > > > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin > > wrote: > > > > > > > Hey David, > > > > > > > > Thanks for replies to the questions. > > > > > > > > I think one major thing still not clear at this point is that whether > > the > > > > brokers will only apply the consumed log retention to a specific set > of > > > > interested consumer groups, or it does not have such a set of > consumer > > > > groups. > > > > > > > > For example, for topic T, assume we know that there will be two > > > downstream > > > > consumer groups CG1 and CG2 consuming data from topic T. Will we add > a > > > > topic configurations such as > > > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so > > > that > > > > the brokers only care about CG1 and CG2. The committed offsets of > other > > > > groups are not interested and won't have any impact on the committed > > > offset > > > > based log retention. > > > > > > > > It seems the current proposal does not have an "interested consumer > > group > > > > set" configuration, so that means any random consumer group may > affect > > > the > > > > committed offset based log retention. > > > > > > > > I think the committed offset based log retention seems more useful in > > > cases > > > > where we already know which consumer groups will be consuming from > this > > > > topic, so we will only wait for those consumer groups but ignore the > > > > others. If a group will be consumed by many unknown or unpredictable > > > > consumer groups, it seems the existing time based log retention is > much > > > > simple and clear enough. So I would argue we don't need to address > the > > > case > > > > that some groups may come later in the committed offset based > > retention. > > > > > > > > That said, there may still be value to keep the data for some time > even > > > > after all the interested consumer groups have consumed the messages. > > For > > > > example, in a pipelined stream processing DAG, we may want to keep > the > > > data > > > > of an inter
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Overall I think the motivation is common and of interests to lots of users. Would like to throw my two cents on this discussion: 1. Kafka topics can be used in different ways. For some categories of topics (think: "pageView" event topics), it is a shared topic among different teams / apps within the organization and lots of temporary consumers (for debugging, trouble shooting, prototype development, etc) can come and go dynamically, in which case it is hard to track all of such consumer and maintain the minimum committed offsets; on the other hand, there are another category of topics (think: stream-app owned intermediate topics like "pricing-enriched-bid-activity", as Becket mentioned above) which are particularly own but only one or a few apps, and hence the consumer groups for those topics are pre-defined and roughly static. In this case I think it makes sense to allow such consumer-drive log retention features. 2. In this case, my question is then whether this bookkeeping of min-committed-offsets should be done at the brokers side or it should be on the app side. My gut feeling is that it could be better bookkept on the app (i.e. client) side which has the full information of the "registered consumer groups" for certain topics, and then knows the min-committed-offsets. And a slightly-modified KIP-47 mentioned by Dong could a better fit, where a) app side bookkeep the consumer-driven min offset based on their committed offsets, by either talking to the consumer clients directly or query broker for the committed offsets of those registered consumer groups, and then b) write *log.retention.min.offset* periodically to broker to let it delete old segments before that offset (NOTE that the semantics is exactly the same as to KIP-47, while the only difference is that we use offset instead of timestamp to indicate, which can be honor by the same implementation of KIP-47 on broker side). My arguments for letting the app side to bookkeep such min-offsets and only let brokers to take requests to delete segments accordingly are 1) keeping the broker simple without any querying each other about such offsets and does the min() calculation, rather only keeping / deleting messages from client admin requests, and 2) allowing more generalized client-driven log retention policies with KIP-47 (i.e. broker is brainless and only take requests while client-app can apply any customized logic to determine the config values of *og.retention.min.offset or **og.retention.min.timestamp* that they send to the brokers). Guozhang On Sat, Oct 22, 2016 at 5:46 PM, Becket Qin wrote: > Hi David, > > > 1. What scenario is used to this configuration? > > One scenario is stream processing pipeline. In a stream processing DAG, > there will be a bunch of intermediate result, we only care about the > consumer group that is in the downstream of the DAG, but not other groups. > Ideally we want to delete the log of the intermediate topics right after > all the downstream processing jobs has successfully processed the messages. > In that case, we only care about the downstream processing jobs, but not > other groups. That means if a down stream job did not commit offset for > some reason, we want to wait for that job. Without the predefined > interested group, it is hard to achieve this. > > > 2. Yes, the configuration should be at topic level and set dynamically. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > > > Hi Mayuresh, > > Thanks for the reply: > > 1. In the log retention check schedule, the broker first find the all > the > > consumed group which are consuming this topic, and query the commit > offset > > of this consumed group for the topic > > using the OffsetFetch API. And the min commit offset is the minimal > commit > > offset between these commit offsets. > > > > > > 2. If the console consumer reading and commit, its commit offset will be > > used to calculate the min commit offset for this topic. > > We can avoid the random consumer using the method Becket suggested. > > > > > > 3. It will not delete the log immediately, the log will stay some time ( > > retention.commitoffset.ms), and after that we only delete > > the log segments whose offsets are less than the min commit offset. So > > the user can rewind its offset in the log.retention.ms. > > > > > > Thanks, > > David > > > > > > > > > > -- 原始邮件 -- > > 发件人: "Mayuresh Gharat";; > > 发送时间: 2016年10月19日(星期三) 上午10:25 > > 收件人: "dev"; > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > retention > > > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, > 1. What scenario is used to this configuration? One scenario is stream processing pipeline. In a stream processing DAG, there will be a bunch of intermediate result, we only care about the consumer group that is in the downstream of the DAG, but not other groups. Ideally we want to delete the log of the intermediate topics right after all the downstream processing jobs has successfully processed the messages. In that case, we only care about the downstream processing jobs, but not other groups. That means if a down stream job did not commit offset for some reason, we want to wait for that job. Without the predefined interested group, it is hard to achieve this. 2. Yes, the configuration should be at topic level and set dynamically. Thanks, Jiangjie (Becket) Qin On Fri, Oct 21, 2016 at 7:40 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi Mayuresh, > Thanks for the reply: > 1. In the log retention check schedule, the broker first find the all the > consumed group which are consuming this topic, and query the commit offset > of this consumed group for the topic > using the OffsetFetch API. And the min commit offset is the minimal commit > offset between these commit offsets. > > > 2. If the console consumer reading and commit, its commit offset will be > used to calculate the min commit offset for this topic. > We can avoid the random consumer using the method Becket suggested. > > > 3. It will not delete the log immediately, the log will stay some time ( > retention.commitoffset.ms), and after that we only delete > the log segments whose offsets are less than the min commit offset. So > the user can rewind its offset in the log.retention.ms. > > > Thanks, > David > > > > > -- 原始邮件 -- > 发件人: "Mayuresh Gharat";; > 发送时间: 2016年10月19日(星期三) 上午10:25 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > Thanks for the KIP. > > I had some questions/suggestions : > > It would be great if you can explain with an example about how the min > offset for all the consumers will be calculated, in the KIP. > What I meant was, it would be great to understand with a pseudo > code/workflow if possible, how each broker knows all the consumers for the > given topic-partition and how the min is calculated. > > Also it would be good to understand what happens if we start a console > consumer which would actually start reading from the beginning offset and > commit and crash immediately. How will the segments get deleted? > > Will it delete all the log segments if all the consumers have read till > latest? If Yes, would we be able to handle a scenario were we say that user > can rewind its offset to reprocess the data since log.retention.ms might > not has reached. > > Thanks, > > Mayuresh > > On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin wrote: > > > Hey David, > > > > Thanks for replies to the questions. > > > > I think one major thing still not clear at this point is that whether the > > brokers will only apply the consumed log retention to a specific set of > > interested consumer groups, or it does not have such a set of consumer > > groups. > > > > For example, for topic T, assume we know that there will be two > downstream > > consumer groups CG1 and CG2 consuming data from topic T. Will we add a > > topic configurations such as > > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so > that > > the brokers only care about CG1 and CG2. The committed offsets of other > > groups are not interested and won't have any impact on the committed > offset > > based log retention. > > > > It seems the current proposal does not have an "interested consumer group > > set" configuration, so that means any random consumer group may affect > the > > committed offset based log retention. > > > > I think the committed offset based log retention seems more useful in > cases > > where we already know which consumer groups will be consuming from this > > topic, so we will only wait for those consumer groups but ignore the > > others. If a group will be consumed by many unknown or unpredictable > > consumer groups, it seems the existing time based log retention is much > > simple and clear enough. So I would argue we don't need to address the > case > > that some groups may come later in the committed offset based retention. > > > > That said, there may still be value to keep the data for some time even > > after all the interested consumer groups have consumed the messages. Fo
?????? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Mayuresh, Thanks for the reply: 1. In the log retention check schedule, the broker first find the all the consumed group which are consuming this topic, and query the commit offset of this consumed group for the topic using the OffsetFetch API. And the min commit offset is the minimal commit offset between these commit offsets. 2. If the console consumer reading and commit, its commit offset will be used to calculate the min commit offset for this topic. We can avoid the random consumer using the method Becket suggested. 3. It will not delete the log immediately, the log will stay some time (retention.commitoffset.ms), and after that we only delete the log segments whose offsets are less than the min commit offset. So the user can rewind its offset in the log.retention.ms. Thanks, David -- -- ??: "Mayuresh Gharat";; : 2016??10??19??(??) 10:25 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, Thanks for the KIP. I had some questions/suggestions : It would be great if you can explain with an example about how the min offset for all the consumers will be calculated, in the KIP. What I meant was, it would be great to understand with a pseudo code/workflow if possible, how each broker knows all the consumers for the given topic-partition and how the min is calculated. Also it would be good to understand what happens if we start a console consumer which would actually start reading from the beginning offset and commit and crash immediately. How will the segments get deleted? Will it delete all the log segments if all the consumers have read till latest? If Yes, would we be able to handle a scenario were we say that user can rewind its offset to reprocess the data since log.retention.ms might not has reached. Thanks, Mayuresh On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin wrote: > Hey David, > > Thanks for replies to the questions. > > I think one major thing still not clear at this point is that whether the > brokers will only apply the consumed log retention to a specific set of > interested consumer groups, or it does not have such a set of consumer > groups. > > For example, for topic T, assume we know that there will be two downstream > consumer groups CG1 and CG2 consuming data from topic T. Will we add a > topic configurations such as > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so that > the brokers only care about CG1 and CG2. The committed offsets of other > groups are not interested and won't have any impact on the committed offset > based log retention. > > It seems the current proposal does not have an "interested consumer group > set" configuration, so that means any random consumer group may affect the > committed offset based log retention. > > I think the committed offset based log retention seems more useful in cases > where we already know which consumer groups will be consuming from this > topic, so we will only wait for those consumer groups but ignore the > others. If a group will be consumed by many unknown or unpredictable > consumer groups, it seems the existing time based log retention is much > simple and clear enough. So I would argue we don't need to address the case > that some groups may come later in the committed offset based retention. > > That said, there may still be value to keep the data for some time even > after all the interested consumer groups have consumed the messages. For > example, in a pipelined stream processing DAG, we may want to keep the data > of an intermediate topic for some time in case the job fails. So we can > resume from a previously succeeded stage instead of restart the entire > pipeline. Or we can use the intermediate topic for some debugging work. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Sun, Oct 16, 2016 at 2:15 AM, <254479...@qq.com> wrote: > > > Hi Dong, > > The KIP is used to solve both these 2 cases, we specify a small > > consumed log retention time to deleted the consumed data and avoid losing > > un-consumed data. > > And the specify a large force log retention time used as higher bound for > > the data. I will update the KIP for this info. > > Another solution I think may be ok is to support an API to delete the > > inactive group? If the group is in inactive, but it's commit offset is > > also in the __commit_offsets topic and > > stay in the offset cache, we can delete it via this API. > > > > > > Thanks, > > David > > > > > > -- -- > > ??: "Dong Lin";; > > : 2016??
?????? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Becket, It seems a good idea to have the topic to some specified groups to using the consumed log retention, default is all the consumed groups, it is more flexible. I have some comments for this : 1. What scenario is used to this configuration? 2. I think we can only support this configuration on the topic-level, so that all the brokers will do the same consumed log retention for the specified groups. Otherwise, some brokers will have different behavior for the same topic. And if this configuration is broker level, we have to add the group and topic Mapping relationship too, the specified consumed group may not consuming all the topics in this brokers. Thanks, David -- -- ??: "Becket Qin";; : 2016??10??17??(??) 3:27 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hey David, Thanks for replies to the questions. I think one major thing still not clear at this point is that whether the brokers will only apply the consumed log retention to a specific set of interested consumer groups, or it does not have such a set of consumer groups. For example, for topic T, assume we know that there will be two downstream consumer groups CG1 and CG2 consuming data from topic T. Will we add a topic configurations such as "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so that the brokers only care about CG1 and CG2. The committed offsets of other groups are not interested and won't have any impact on the committed offset based log retention. It seems the current proposal does not have an "interested consumer group set" configuration, so that means any random consumer group may affect the committed offset based log retention. I think the committed offset based log retention seems more useful in cases where we already know which consumer groups will be consuming from this topic, so we will only wait for those consumer groups but ignore the others. If a group will be consumed by many unknown or unpredictable consumer groups, it seems the existing time based log retention is much simple and clear enough. So I would argue we don't need to address the case that some groups may come later in the committed offset based retention. That said, there may still be value to keep the data for some time even after all the interested consumer groups have consumed the messages. For example, in a pipelined stream processing DAG, we may want to keep the data of an intermediate topic for some time in case the job fails. So we can resume from a previously succeeded stage instead of restart the entire pipeline. Or we can use the intermediate topic for some debugging work. Thanks, Jiangjie (Becket) Qin On Sun, Oct 16, 2016 at 2:15 AM, <254479...@qq.com> wrote: > Hi Dong, > The KIP is used to solve both these 2 cases, we specify a small > consumed log retention time to deleted the consumed data and avoid losing > un-consumed data. > And the specify a large force log retention time used as higher bound for > the data. I will update the KIP for this info. > Another solution I think may be ok is to support an API to delete the > inactive group? If the group is in inactive, but it's commit offset is > also in the __commit_offsets topic and > stay in the offset cache, we can delete it via this API. > > > Thanks, > David > > > -- ---------- > ??????: "Dong Lin";; > ????: 2016??10??14??(??) 5:01 > ??: "dev"; > > : Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > As explained in the motivation section of the KIP, the problem is that if > log retention is too small, we may lose data; and if log retention is too > large, then we waste disk space. Therefore, we need to solve one if the two > problems -- allow data to be persisted longer for consumption if log > retention is set too small, or allow data to be expired earlier if log > retention is too large. I think the KIP probably needs to make this clear > and explain which one is rejected and why. Note that the choice of the two > affects the solution -- if we want to address the first problem then > log.retention.ms should be used as lower bound on the actual retention > time, and if we want to address the second problem then the > log.retention.ms > should be used as higher bound on the actual retention time. > > In both cases, we probably need to figure out a way to determine "active > consumer group". Maybe we can compare the time-since-last-commit against a > threshold to determine this. In addition, the threshold can be overridden > either per-topic or per-groupId. If we go along this route, the
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, Thanks for the KIP. I had some questions/suggestions : It would be great if you can explain with an example about how the min offset for all the consumers will be calculated, in the KIP. What I meant was, it would be great to understand with a pseudo code/workflow if possible, how each broker knows all the consumers for the given topic-partition and how the min is calculated. Also it would be good to understand what happens if we start a console consumer which would actually start reading from the beginning offset and commit and crash immediately. How will the segments get deleted? Will it delete all the log segments if all the consumers have read till latest? If Yes, would we be able to handle a scenario were we say that user can rewind its offset to reprocess the data since log.retention.ms might not has reached. Thanks, Mayuresh On Mon, Oct 17, 2016 at 12:27 AM, Becket Qin wrote: > Hey David, > > Thanks for replies to the questions. > > I think one major thing still not clear at this point is that whether the > brokers will only apply the consumed log retention to a specific set of > interested consumer groups, or it does not have such a set of consumer > groups. > > For example, for topic T, assume we know that there will be two downstream > consumer groups CG1 and CG2 consuming data from topic T. Will we add a > topic configurations such as > "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so that > the brokers only care about CG1 and CG2. The committed offsets of other > groups are not interested and won't have any impact on the committed offset > based log retention. > > It seems the current proposal does not have an "interested consumer group > set" configuration, so that means any random consumer group may affect the > committed offset based log retention. > > I think the committed offset based log retention seems more useful in cases > where we already know which consumer groups will be consuming from this > topic, so we will only wait for those consumer groups but ignore the > others. If a group will be consumed by many unknown or unpredictable > consumer groups, it seems the existing time based log retention is much > simple and clear enough. So I would argue we don't need to address the case > that some groups may come later in the committed offset based retention. > > That said, there may still be value to keep the data for some time even > after all the interested consumer groups have consumed the messages. For > example, in a pipelined stream processing DAG, we may want to keep the data > of an intermediate topic for some time in case the job fails. So we can > resume from a previously succeeded stage instead of restart the entire > pipeline. Or we can use the intermediate topic for some debugging work. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479...@qq.com> wrote: > > > Hi Dong, > > The KIP is used to solve both these 2 cases, we specify a small > > consumed log retention time to deleted the consumed data and avoid losing > > un-consumed data. > > And the specify a large force log retention time used as higher bound for > > the data. I will update the KIP for this info. > > Another solution I think may be ok is to support an API to delete the > > inactive group? If the group is in inactive, but it's commit offset is > > also in the __commit_offsets topic and > > stay in the offset cache, we can delete it via this API. > > > > > > Thanks, > > David > > > > > > -- 原始邮件 -- > > 发件人: "Dong Lin";; > > 发送时间: 2016年10月14日(星期五) 凌晨5:01 > > 收件人: "dev"; > > > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log > retention > > > > > > > > Hi David, > > > > As explained in the motivation section of the KIP, the problem is that if > > log retention is too small, we may lose data; and if log retention is too > > large, then we waste disk space. Therefore, we need to solve one if the > two > > problems -- allow data to be persisted longer for consumption if log > > retention is set too small, or allow data to be expired earlier if log > > retention is too large. I think the KIP probably needs to make this clear > > and explain which one is rejected and why. Note that the choice of the > two > > affects the solution -- if we want to address the first problem then > > log.retention.ms should be used as lower bound on the actual retention > > time, and if we want to address the second problem then the > > log.retention.ms > > should be used
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hey David, Thanks for replies to the questions. I think one major thing still not clear at this point is that whether the brokers will only apply the consumed log retention to a specific set of interested consumer groups, or it does not have such a set of consumer groups. For example, for topic T, assume we know that there will be two downstream consumer groups CG1 and CG2 consuming data from topic T. Will we add a topic configurations such as "log.retention.commitoffset.interested.groups=CG1,CG2" to topic T so that the brokers only care about CG1 and CG2. The committed offsets of other groups are not interested and won't have any impact on the committed offset based log retention. It seems the current proposal does not have an "interested consumer group set" configuration, so that means any random consumer group may affect the committed offset based log retention. I think the committed offset based log retention seems more useful in cases where we already know which consumer groups will be consuming from this topic, so we will only wait for those consumer groups but ignore the others. If a group will be consumed by many unknown or unpredictable consumer groups, it seems the existing time based log retention is much simple and clear enough. So I would argue we don't need to address the case that some groups may come later in the committed offset based retention. That said, there may still be value to keep the data for some time even after all the interested consumer groups have consumed the messages. For example, in a pipelined stream processing DAG, we may want to keep the data of an intermediate topic for some time in case the job fails. So we can resume from a previously succeeded stage instead of restart the entire pipeline. Or we can use the intermediate topic for some debugging work. Thanks, Jiangjie (Becket) Qin On Sun, Oct 16, 2016 at 2:15 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi Dong, > The KIP is used to solve both these 2 cases, we specify a small > consumed log retention time to deleted the consumed data and avoid losing > un-consumed data. > And the specify a large force log retention time used as higher bound for > the data. I will update the KIP for this info. > Another solution I think may be ok is to support an API to delete the > inactive group? If the group is in inactive, but it's commit offset is > also in the __commit_offsets topic and > stay in the offset cache, we can delete it via this API. > > > Thanks, > David > > > -- 原始邮件 ---------- > 发件人: "Dong Lin";; > 发送时间: 2016年10月14日(星期五) 凌晨5:01 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > As explained in the motivation section of the KIP, the problem is that if > log retention is too small, we may lose data; and if log retention is too > large, then we waste disk space. Therefore, we need to solve one if the two > problems -- allow data to be persisted longer for consumption if log > retention is set too small, or allow data to be expired earlier if log > retention is too large. I think the KIP probably needs to make this clear > and explain which one is rejected and why. Note that the choice of the two > affects the solution -- if we want to address the first problem then > log.retention.ms should be used as lower bound on the actual retention > time, and if we want to address the second problem then the > log.retention.ms > should be used as higher bound on the actual retention time. > > In both cases, we probably need to figure out a way to determine "active > consumer group". Maybe we can compare the time-since-last-commit against a > threshold to determine this. In addition, the threshold can be overridden > either per-topic or per-groupId. If we go along this route, the rejected > solution (per-topic vs. per-groupId) should probably be explained in the > KIP. > > > Thanks, > Dong > > > > On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin wrote: > > > Hi David, > > > > Thanks for your explanation. There still seems to be issue with this > > solution. Please see my comment inline. > > > > > > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote: > > > >> Hi Dong, > >> Sorry for the delay, here are the comments: > >> 1.I think we should distinguish these two cases: > >> (1) group has no member, but has commit offset : In this case we should > >> consider its commit offset > >> (2) group has no member, no commit offset: Skip this group > >> Is it ok? > >> > >> > >> ListGroup API can list the groups, but this API only show the Online >
?????? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Dong, The KIP is used to solve both these 2 cases, we specify a small consumed log retention time to deleted the consumed data and avoid losing un-consumed data. And the specify a large force log retention time used as higher bound for the data. I will update the KIP for this info. Another solution I think may be ok is to support an API to delete the inactive group? If the group is in inactive, but it's commit offset is also in the __commit_offsets topic and stay in the offset cache, we can delete it via this API. Thanks, David -- -- ??: "Dong Lin";; : 2016??10??14??(??) 5:01 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, As explained in the motivation section of the KIP, the problem is that if log retention is too small, we may lose data; and if log retention is too large, then we waste disk space. Therefore, we need to solve one if the two problems -- allow data to be persisted longer for consumption if log retention is set too small, or allow data to be expired earlier if log retention is too large. I think the KIP probably needs to make this clear and explain which one is rejected and why. Note that the choice of the two affects the solution -- if we want to address the first problem then log.retention.ms should be used as lower bound on the actual retention time, and if we want to address the second problem then the log.retention.ms should be used as higher bound on the actual retention time. In both cases, we probably need to figure out a way to determine "active consumer group". Maybe we can compare the time-since-last-commit against a threshold to determine this. In addition, the threshold can be overridden either per-topic or per-groupId. If we go along this route, the rejected solution (per-topic vs. per-groupId) should probably be explained in the KIP. Thanks, Dong On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin wrote: > Hi David, > > Thanks for your explanation. There still seems to be issue with this > solution. Please see my comment inline. > > > On Thu, Oct 13, 2016 at 8:46 AM, <254479...@qq.com> wrote: > >> Hi Dong, >> Sorry for the delay, here are the comments: >> 1.I think we should distinguish these two cases: >> (1) group has no member, but has commit offset : In this case we should >> consider its commit offset >> (2) group has no member, no commit offset: Skip this group >> Is it ok? >> >> >> ListGroup API can list the groups, but this API only show the Online >> Group, so we should enhance the listGroup API to list those groups in the >> case (1) >> >> Say some user starts a consumer to consume topic A with > enable.auto.commit = true. Later they change the group name in the config. > Then the proposed solution will never execute consumed log retention for > the topic A, right? I think group name change is pretty common and we > should take care of this issue. One possible solution is to add a config to > specify the maximum time since last offset commit before we consider a > group is inactive. > > >> >> 2. Because every consumer group may appear in different time, say, group >> 1 start to consume in day 1, group 2 start to consume in day 2. If we >> delete the log segment right away, >> group 2 can not consume these message. So we hope the messages can hold >> for a specified time. I think many use-cases will need there configs, if >> there are many consumer groups. >> >> > If we want to take care of group 2, can we simply disable consumed log > retention for the topic and set log retention to 1 day? Can you explain the > benefit of enabling consumed log retention and set consumed log retention > to 1 day? > > Currently the flow graph in the KIP suggests that we delete data iff > (consumed log retention is triggered OR forced log retention is triggered). > And alternative solution is to delete data iff ( (consumed log retention is > disabled OR consumed log retention is triggered) AND forced log retention > is triggered). I would argue that the 2nd scheme is better. Say the > consumed log retention is enabled. The 1st scheme basically interprets > forced log retention as the upper bound of the time the data can stay in > Kafka. The 2nd scheme interprets forced log retention as the lower bound of > the time the data can stay in Kafka, which is more consistent with the > purpose of having this forced log retention (to save disk space). And if we > adopt the 2nd solution, the use-case you suggested can be easily addressed > by setting forced log retention to 1 day and enable consumed log retention. > What do you think? > > > >
?????? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Dong, Thanks for the comments: 1. Say some user starts a consumer to consume topic A with enable.auto.commit = true. Later they change the group name in the config. Then the proposed solution will never execute consumed log retention for the topic A, right? I think group name change is pretty common and we should take care of this issue. One possible solution is to add a config to specify the maximum time since last offset commit before we consider a group is inactive. If the group name changed, we don't know whether the group name will be reused or not. If the group's commit offset is discard in a specified time, why not used the commit offset timeout? Then this inactive group's will actually be remove. 2. If we want to take care of group 2, can we simply disable consumed log retention for the topic and set log retention to 1 day? Can you explain the benefit of enabling consumed log retention and set consumed log retention to 1 day? If we want to take care of group 2 which is starting to consume in day 2, we should at least save the message in day 2. Like we set the log.retention.commitoffset to day 3. Currently the flow graph in the KIP suggests that we delete data iff (consumed log retention is triggered OR forced log retention is triggered). And alternative solution is to delete data iff ( (consumed log retention is disabled OR consumed log retention is triggered) AND forced log retention is triggered). I would argue that the 2nd scheme is better. Say the consumed log retention is enabled. The 1st scheme basically interprets forced log retention as the upper bound of the time the data can stay in Kafka. The 2nd scheme interprets forced log retention as the lower bound of the time the data can stay in Kafka, which is more consistent with the purpose of having this forced log retention (to save disk space). And if we adopt the 2nd solution, the use-case you suggested can be easily addressed by setting forced log retention to 1 day and enable consumed log retention. What do you think? Yes , that is the KIP's suggestion, the consumed log retention is before the force log retention, they can be both triggered. For my use-case, the force log retention should be larger than consumed log retention, say for 7 days, and consumed log retention is set to 3 days, so after 3 days, we can quickly clean the messages which are consumed. And after 7 days, even if the messages are not consumed, we will clean it anyway. -- -- ??: "Dong Lin";; : 2016??10??14??(??) 1:23 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, Thanks for your explanation. There still seems to be issue with this solution. Please see my comment inline. On Thu, Oct 13, 2016 at 8:46 AM, <254479...@qq.com> wrote: > Hi Dong, > Sorry for the delay, here are the comments: > 1.I think we should distinguish these two cases: > (1) group has no member, but has commit offset : In this case we should > consider its commit offset > (2) group has no member, no commit offset: Skip this group > Is it ok? > > > ListGroup API can list the groups, but this API only show the Online > Group, so we should enhance the listGroup API to list those groups in the > case (1) > > Say some user starts a consumer to consume topic A with enable.auto.commit = true. Later they change the group name in the config. Then the proposed solution will never execute consumed log retention for the topic A, right? I think group name change is pretty common and we should take care of this issue. One possible solution is to add a config to specify the maximum time since last offset commit before we consider a group is inactive. > > 2. Because every consumer group may appear in different time, say, group 1 > start to consume in day 1, group 2 start to consume in day 2. If we delete > the log segment right away, > group 2 can not consume these message. So we hope the messages can hold > for a specified time. I think many use-cases will need there configs, if > there are many consumer groups. > > If we want to take care of group 2, can we simply disable consumed log retention for the topic and set log retention to 1 day? Can you explain the benefit of enabling consumed log retention and set consumed log retention to 1 day? Currently the flow graph in the KIP suggests that we delete data iff (consumed log retention is triggered OR forced log retention is triggered). And alternative solution is to delete data iff ( (consumed log retention is disabled OR consumed log retention is triggered) AND forced log retention is triggered). I would argue that the 2nd scheme is better. Say the consumed log retention is enabled. The 1st scheme basically interprets forced log retention as the upper bound of the time the data
?????? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Renu: Sorry for the delay, here are the comments: 1. You mean the config for the topic ? We also support the per topic's consumed retention configuration. 2. The consumer group's commit offset timeout is support in the 0.9, the consumed retention only concern about the current commit offset. log retention will not stop even after the consumer disappears. 3. You can set to a very small time, for example 1ms, so the log will be deleted after consumed. Set to 0 will throw error now. 4. Log retention timeout will not change, it only depend on the now time - last modified time of the log file. In the case when a new consumer comes, will find the new min commit offset in the consumed retention process. Thanks, David -- -- ??: "Renu Tewari";; : 2016??10??11??(??) 4:55 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David This is a very timely KIP given the number of use cases in the streams processing pipeline than need consumed log retention management. Some questions that Becket and Dong asked just wanted to make sure are described in the KIP. 1. How is the configuration setup per topic to know what is the set of consumer groups that are "subscribed" to this topic whose committed offsets will be tracked. Can we have more details on how this will be dynamically tracked as consumers come and go. 2. Is there a timeout to determine if a consumer group has stopped committing offsets to topic partitions that they had earlier consumed? Or the consumed log retention will track each known consumer/consumers groups committed offset and stop any cleaning if a consumer disappears after consuming. This is to Dong's earlier question. 3. Can the log.retention value be set to 0 to indicate the log is set to be cleaned to the min committed offset immediately after it has been consumed? 4. What guarantee is given on when the consumed log will eventually be cleaned. If the log.retention timeout is enabled for a consumed offset and a new consumer starts consuming from the beginning then is the min committed offset value changed and the timer based on log.retention timeout restarted? This kind of all relates to active and inactive consumers and if the set changes dynamically how does the consumed log retention actually make progress. regards renu On Mon, Oct 10, 2016 at 1:05 AM, Dong Lin wrote: > Hey David, > > Thanks for reply. Please see comment inline. > > On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) > wrote: > > > Hi Dong > >Thanks for the questions: > > > > 1. Now we don't distinguish inactive or active groups. Because in some > > case maybe inactive group will become active again, and using the > previous > > commit offset. > > > > So we will not delete the log segment in the consumer retention if there > > are some groups consume but not commit, but the log segment can be > delete by > > the force retention. > > > > So in the example I provided, the consumed log retention will be > effectively disabled, right? This seems to be a real problem in operation > -- we don't want log retention to be un-intentionally disabled simply > because someone start a tool to consume from that topic. Either this KIP > should provide a way to handle this, or there should be a way for operator > to be aware of such case and be able to re-eanble consumed log retention > for the topic. What do you think? > > > > > 2. These configs are used to determine the out of date time of the > > consumed retention, like the parameters of the force retention > > (log.retention.hours, log.retention.minutes, log.retention.ms). For > > example, users want the save the log for 3 days, after 3 days, kafka will > > delete the log segments which are > > > > consumed by all the consumer group. The log retention thread need these > > parameters. > > > > It makes sense to have configs such as log.retention.ms -- it is used to > make data available for up to a configured amount of time before it is > deleted. My question is what is the use-case for making log available for > another e.g. 3 days after it has been consumed by all consumer groups. The > purpose of this KIP is to allow log to be deleted right as long as all > interested consumer groups have consumed it. Can you provide a use-case for > keeping log available for longer time after it has been consumed by all > groups? > > > > > > Thanks, > > David > > > > > > > Hey David, > > > > > > Thanks for the KIP. Can you help with the following two questions: > > > > > > 1) If someone start a consumer (e.g. kafka-con
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, As explained in the motivation section of the KIP, the problem is that if log retention is too small, we may lose data; and if log retention is too large, then we waste disk space. Therefore, we need to solve one if the two problems -- allow data to be persisted longer for consumption if log retention is set too small, or allow data to be expired earlier if log retention is too large. I think the KIP probably needs to make this clear and explain which one is rejected and why. Note that the choice of the two affects the solution -- if we want to address the first problem then log.retention.ms should be used as lower bound on the actual retention time, and if we want to address the second problem then the log.retention.ms should be used as higher bound on the actual retention time. In both cases, we probably need to figure out a way to determine "active consumer group". Maybe we can compare the time-since-last-commit against a threshold to determine this. In addition, the threshold can be overridden either per-topic or per-groupId. If we go along this route, the rejected solution (per-topic vs. per-groupId) should probably be explained in the KIP. Thanks, Dong On Thu, Oct 13, 2016 at 10:23 AM, Dong Lin wrote: > Hi David, > > Thanks for your explanation. There still seems to be issue with this > solution. Please see my comment inline. > > > On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote: > >> Hi Dong, >> Sorry for the delay, here are the comments: >> 1.I think we should distinguish these two cases: >> (1) group has no member, but has commit offset : In this case we should >> consider its commit offset >> (2) group has no member, no commit offset: Skip this group >> Is it ok? >> >> >> ListGroup API can list the groups, but this API only show the Online >> Group, so we should enhance the listGroup API to list those groups in the >> case (1) >> >> Say some user starts a consumer to consume topic A with > enable.auto.commit = true. Later they change the group name in the config. > Then the proposed solution will never execute consumed log retention for > the topic A, right? I think group name change is pretty common and we > should take care of this issue. One possible solution is to add a config to > specify the maximum time since last offset commit before we consider a > group is inactive. > > >> >> 2. Because every consumer group may appear in different time, say, group >> 1 start to consume in day 1, group 2 start to consume in day 2. If we >> delete the log segment right away, >> group 2 can not consume these message. So we hope the messages can hold >> for a specified time. I think many use-cases will need there configs, if >> there are many consumer groups. >> >> > If we want to take care of group 2, can we simply disable consumed log > retention for the topic and set log retention to 1 day? Can you explain the > benefit of enabling consumed log retention and set consumed log retention > to 1 day? > > Currently the flow graph in the KIP suggests that we delete data iff > (consumed log retention is triggered OR forced log retention is triggered). > And alternative solution is to delete data iff ( (consumed log retention is > disabled OR consumed log retention is triggered) AND forced log retention > is triggered). I would argue that the 2nd scheme is better. Say the > consumed log retention is enabled. The 1st scheme basically interprets > forced log retention as the upper bound of the time the data can stay in > Kafka. The 2nd scheme interprets forced log retention as the lower bound of > the time the data can stay in Kafka, which is more consistent with the > purpose of having this forced log retention (to save disk space). And if we > adopt the 2nd solution, the use-case you suggested can be easily addressed > by setting forced log retention to 1 day and enable consumed log retention. > What do you think? > > > >> >> Thanks, >> David >> >> >> >> >> -- 原始邮件 -- >> 发件人: "Dong Lin";; >> 发送时间: 2016年10月10日(星期一) 下午4:05 >> 收件人: "dev"; >> >> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention >> >> >> >> Hey David, >> >> Thanks for reply. Please see comment inline. >> >> On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) >> wrote: >> >> > Hi Dong >> >Thanks for the questions: >> > >> > 1. Now we don't distinguish inactive or active groups. Because in some >> > case maybe inactive group will become active again, and using the >> previous
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, Thanks for your explanation. There still seems to be issue with this solution. Please see my comment inline. On Thu, Oct 13, 2016 at 8:46 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi Dong, > Sorry for the delay, here are the comments: > 1.I think we should distinguish these two cases: > (1) group has no member, but has commit offset : In this case we should > consider its commit offset > (2) group has no member, no commit offset: Skip this group > Is it ok? > > > ListGroup API can list the groups, but this API only show the Online > Group, so we should enhance the listGroup API to list those groups in the > case (1) > > Say some user starts a consumer to consume topic A with enable.auto.commit = true. Later they change the group name in the config. Then the proposed solution will never execute consumed log retention for the topic A, right? I think group name change is pretty common and we should take care of this issue. One possible solution is to add a config to specify the maximum time since last offset commit before we consider a group is inactive. > > 2. Because every consumer group may appear in different time, say, group 1 > start to consume in day 1, group 2 start to consume in day 2. If we delete > the log segment right away, > group 2 can not consume these message. So we hope the messages can hold > for a specified time. I think many use-cases will need there configs, if > there are many consumer groups. > > If we want to take care of group 2, can we simply disable consumed log retention for the topic and set log retention to 1 day? Can you explain the benefit of enabling consumed log retention and set consumed log retention to 1 day? Currently the flow graph in the KIP suggests that we delete data iff (consumed log retention is triggered OR forced log retention is triggered). And alternative solution is to delete data iff ( (consumed log retention is disabled OR consumed log retention is triggered) AND forced log retention is triggered). I would argue that the 2nd scheme is better. Say the consumed log retention is enabled. The 1st scheme basically interprets forced log retention as the upper bound of the time the data can stay in Kafka. The 2nd scheme interprets forced log retention as the lower bound of the time the data can stay in Kafka, which is more consistent with the purpose of having this forced log retention (to save disk space). And if we adopt the 2nd solution, the use-case you suggested can be easily addressed by setting forced log retention to 1 day and enable consumed log retention. What do you think? > > Thanks, > David > > > > > -- 原始邮件 -------------- > 发件人: "Dong Lin";; > 发送时间: 2016年10月10日(星期一) 下午4:05 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hey David, > > Thanks for reply. Please see comment inline. > > On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) > wrote: > > > Hi Dong > >Thanks for the questions: > > > > 1. Now we don't distinguish inactive or active groups. Because in some > > case maybe inactive group will become active again, and using the > previous > > commit offset. > > > > So we will not delete the log segment in the consumer retention if there > > are some groups consume but not commit, but the log segment can be > delete by > > the force retention. > > > > So in the example I provided, the consumed log retention will be > effectively disabled, right? This seems to be a real problem in operation > -- we don't want log retention to be un-intentionally disabled simply > because someone start a tool to consume from that topic. Either this KIP > should provide a way to handle this, or there should be a way for operator > to be aware of such case and be able to re-eanble consumed log retention > for the topic. What do you think? > > > > > 2. These configs are used to determine the out of date time of the > > consumed retention, like the parameters of the force retention > > (log.retention.hours, log.retention.minutes, log.retention.ms). For > > example, users want the save the log for 3 days, after 3 days, kafka will > > delete the log segments which are > > > > consumed by all the consumer group. The log retention thread need these > > parameters. > > > > It makes sense to have configs such as log.retention.ms -- it is used to > make data available for up to a configured amount of time before it is > deleted. My question is what is the use-case for making log available for > another e.g. 3 days after it has been consumed by all consumer groups. The > purpose of this KIP is to allow log to be deleted right as
RE?? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Dong, Sorry for the delay, here are the comments: 1.I think we should distinguish these two cases: (1) group has no member, but has commit offset : In this case we should consider its commit offset (2) group has no member, no commit offset: Skip this group Is it ok? ListGroup API can list the groups, but this API only show the Online Group, so we should enhance the listGroup API to list those groups in the case (1) 2. Because every consumer group may appear in different time, say, group 1 start to consume in day 1, group 2 start to consume in day 2. If we delete the log segment right away, group 2 can not consume these message. So we hope the messages can hold for a specified time. I think many use-cases will need there configs, if there are many consumer groups. Thanks, David -- -- ??: "Dong Lin";; : 2016??10??10??(??) 4:05 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hey David, Thanks for reply. Please see comment inline. On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) wrote: > Hi Dong >Thanks for the questions: > > 1. Now we don't distinguish inactive or active groups. Because in some > case maybe inactive group will become active again, and using the previous > commit offset. > > So we will not delete the log segment in the consumer retention if there > are some groups consume but not commit, but the log segment can be delete by > the force retention. > So in the example I provided, the consumed log retention will be effectively disabled, right? This seems to be a real problem in operation -- we don't want log retention to be un-intentionally disabled simply because someone start a tool to consume from that topic. Either this KIP should provide a way to handle this, or there should be a way for operator to be aware of such case and be able to re-eanble consumed log retention for the topic. What do you think? > 2. These configs are used to determine the out of date time of the > consumed retention, like the parameters of the force retention > (log.retention.hours, log.retention.minutes, log.retention.ms). For > example, users want the save the log for 3 days, after 3 days, kafka will > delete the log segments which are > > consumed by all the consumer group. The log retention thread need these > parameters. > > It makes sense to have configs such as log.retention.ms -- it is used to make data available for up to a configured amount of time before it is deleted. My question is what is the use-case for making log available for another e.g. 3 days after it has been consumed by all consumer groups. The purpose of this KIP is to allow log to be deleted right as long as all interested consumer groups have consumed it. Can you provide a use-case for keeping log available for longer time after it has been consumed by all groups? > > Thanks, > David > > > > Hey David, > > > > Thanks for the KIP. Can you help with the following two questions: > > > > 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a > > topic for debug/validation purpose, a randome consumer group may be > created > > and offset may be committed for this consumer group. If no offset commit > is > > made for this consumer group in the future, will this effectively > > disable consumed log retention for this topic? In other words, how do > this > > KIP distinguish active consumer group from inactive ones? > > > > 2) Why do we need new configs such as log.retention.commitoffset.hours? > Can > >we simply delete log segments if consumed log retention is enabled for > this > > topic and all consumer groups have consumed messages in the log segment? > > > > Thanks, > > Dong > > > > > > > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) > wrote: > > > > > Hi Becket, > > > > > > Thanks for the feedback: > > > 1. We use the simple consumer api to query the commit offset, so we > don't > > > need to specify the consumer group. > > > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > > > the commit offset in the log retention process. The client can commit > > > offset or not. > > > 3. It does not need to distinguish the follower brokers or leader > > > brokers, every brokers can query. > > > 4. We don't need to change the protocols, we mainly change the log > > > retention process in the log manager. > > > > > > One question is the query min offset need O(partitions * groups) time > > > complexity, another alternative is to build an internal topic
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David This is a very timely KIP given the number of use cases in the streams processing pipeline than need consumed log retention management. Some questions that Becket and Dong asked just wanted to make sure are described in the KIP. 1. How is the configuration setup per topic to know what is the set of consumer groups that are "subscribed" to this topic whose committed offsets will be tracked. Can we have more details on how this will be dynamically tracked as consumers come and go. 2. Is there a timeout to determine if a consumer group has stopped committing offsets to topic partitions that they had earlier consumed? Or the consumed log retention will track each known consumer/consumers groups committed offset and stop any cleaning if a consumer disappears after consuming. This is to Dong's earlier question. 3. Can the log.retention value be set to 0 to indicate the log is set to be cleaned to the min committed offset immediately after it has been consumed? 4. What guarantee is given on when the consumed log will eventually be cleaned. If the log.retention timeout is enabled for a consumed offset and a new consumer starts consuming from the beginning then is the min committed offset value changed and the timer based on log.retention timeout restarted? This kind of all relates to active and inactive consumers and if the set changes dynamically how does the consumed log retention actually make progress. regards renu On Mon, Oct 10, 2016 at 1:05 AM, Dong Lin wrote: > Hey David, > > Thanks for reply. Please see comment inline. > > On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) > wrote: > > > Hi Dong > >Thanks for the questions: > > > > 1. Now we don't distinguish inactive or active groups. Because in some > > case maybe inactive group will become active again, and using the > previous > > commit offset. > > > > So we will not delete the log segment in the consumer retention if there > > are some groups consume but not commit, but the log segment can be > delete by > > the force retention. > > > > So in the example I provided, the consumed log retention will be > effectively disabled, right? This seems to be a real problem in operation > -- we don't want log retention to be un-intentionally disabled simply > because someone start a tool to consume from that topic. Either this KIP > should provide a way to handle this, or there should be a way for operator > to be aware of such case and be able to re-eanble consumed log retention > for the topic. What do you think? > > > > > 2. These configs are used to determine the out of date time of the > > consumed retention, like the parameters of the force retention > > (log.retention.hours, log.retention.minutes, log.retention.ms). For > > example, users want the save the log for 3 days, after 3 days, kafka will > > delete the log segments which are > > > > consumed by all the consumer group. The log retention thread need these > > parameters. > > > > It makes sense to have configs such as log.retention.ms -- it is used to > make data available for up to a configured amount of time before it is > deleted. My question is what is the use-case for making log available for > another e.g. 3 days after it has been consumed by all consumer groups. The > purpose of this KIP is to allow log to be deleted right as long as all > interested consumer groups have consumed it. Can you provide a use-case for > keeping log available for longer time after it has been consumed by all > groups? > > > > > > Thanks, > > David > > > > > > > Hey David, > > > > > > Thanks for the KIP. Can you help with the following two questions: > > > > > > 1) If someone start a consumer (e.g. kafka-console-consumer) to > consume a > > > topic for debug/validation purpose, a randome consumer group may be > > created > > > and offset may be committed for this consumer group. If no offset > commit > > is > > > made for this consumer group in the future, will this effectively > > > disable consumed log retention for this topic? In other words, how do > > this > > > KIP distinguish active consumer group from inactive ones? > > > > > > 2) Why do we need new configs such as log.retention.commitoffset. > hours? > > Can > > >we simply delete log segments if consumed log retention is enabled for > > this > > > topic and all consumer groups have consumed messages in the log > segment? > > > > > > Thanks, > > > Dong > > > > > > > > > > > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) > > wrote: > > > > > > > Hi Becket, > > > > > > > > Thanks for the feedback: > > > > 1. We use the simple consumer api to query the commit offset, so we > > don't > > > > need to specify the consumer group. > > > > 2. Every broker using the simple consumer api(OffsetFetchKey) to > query > > > > the commit offset in the log retention process. The client can > commit > > > > offset or not. > > > > 3. It does not need to distinguish the follower brokers or leader > > > > brokers, every brokers can query. > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hey David, Thanks for reply. Please see comment inline. On Mon, Oct 10, 2016 at 12:40 AM, Pengwei (L) wrote: > Hi Dong >Thanks for the questions: > > 1. Now we don't distinguish inactive or active groups. Because in some > case maybe inactive group will become active again, and using the previous > commit offset. > > So we will not delete the log segment in the consumer retention if there > are some groups consume but not commit, but the log segment can be delete by > the force retention. > So in the example I provided, the consumed log retention will be effectively disabled, right? This seems to be a real problem in operation -- we don't want log retention to be un-intentionally disabled simply because someone start a tool to consume from that topic. Either this KIP should provide a way to handle this, or there should be a way for operator to be aware of such case and be able to re-eanble consumed log retention for the topic. What do you think? > 2. These configs are used to determine the out of date time of the > consumed retention, like the parameters of the force retention > (log.retention.hours, log.retention.minutes, log.retention.ms). For > example, users want the save the log for 3 days, after 3 days, kafka will > delete the log segments which are > > consumed by all the consumer group. The log retention thread need these > parameters. > > It makes sense to have configs such as log.retention.ms -- it is used to make data available for up to a configured amount of time before it is deleted. My question is what is the use-case for making log available for another e.g. 3 days after it has been consumed by all consumer groups. The purpose of this KIP is to allow log to be deleted right as long as all interested consumer groups have consumed it. Can you provide a use-case for keeping log available for longer time after it has been consumed by all groups? > > Thanks, > David > > > > Hey David, > > > > Thanks for the KIP. Can you help with the following two questions: > > > > 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a > > topic for debug/validation purpose, a randome consumer group may be > created > > and offset may be committed for this consumer group. If no offset commit > is > > made for this consumer group in the future, will this effectively > > disable consumed log retention for this topic? In other words, how do > this > > KIP distinguish active consumer group from inactive ones? > > > > 2) Why do we need new configs such as log.retention.commitoffset.hours? > Can > >we simply delete log segments if consumed log retention is enabled for > this > > topic and all consumer groups have consumed messages in the log segment? > > > > Thanks, > > Dong > > > > > > > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) > wrote: > > > > > Hi Becket, > > > > > > Thanks for the feedback: > > > 1. We use the simple consumer api to query the commit offset, so we > don't > > > need to specify the consumer group. > > > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > > > the commit offset in the log retention process. The client can commit > > > offset or not. > > > 3. It does not need to distinguish the follower brokers or leader > > > brokers, every brokers can query. > > > 4. We don't need to change the protocols, we mainly change the log > > > retention process in the log manager. > > > > > > One question is the query min offset need O(partitions * groups) time > > > complexity, another alternative is to build an internal topic to save > every > > > partition's min offset, it can reduce to O(1). > > > I will update the wiki for more details. > > > > > > Thanks, > > > David > > > > > > > > > > Hi Pengwei, > > > > > > > > Thanks for the KIP proposal. It is a very useful KIP. At a high > level, > > > the > > > > proposed behavior looks reasonable to me. > > > > > > > > However, it seems that some of the details are not mentioned in the > KIP. > > > > For example, > > > > > > > > 1. How will the expected consumer group be specified? Is it through > a per > > > > topic dynamic configuration? > > > > 2. How do the brokers detect the consumer offsets? Is it required > for a > > > > consumer to commit offsets? > > > > 3. How do all the replicas know the about the committed offsets? > e.g. 1) > > > > non-coordinator brokers which do not have the committed offsets, 2) > > > > follower brokers which do not have consumers directly consuming from > it. > > > > 4. Is there any other changes need to be made (e.g. new protocols) in > > > > addition to the configuration change? > > > > > > > > It would be great if you can update the wiki to have more details. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) > > > wrote: > > > > > > > > > Hi All, > > > > >I have made a KIP to enhance the log retention, details as > follows: > > > > > https://cwiki.apache.org/confluence/dis
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Dong Thanks for the questions: 1. Now we don't distinguish inactive or active groups. Because in some case maybe inactive group will become active again, and using the previous commit offset. So we will not delete the log segment in the consumer retention if there are some groups consume but not commit, but the log segment can be delete by the force retention. 2. These configs are used to determine the out of date time of the consumed retention, like the parameters of the force retention (log.retention.hours, log.retention.minutes, log.retention.ms). For example, users want the save the log for 3 days, after 3 days, kafka will delete the log segments which are consumed by all the consumer group. The log retention thread need these parameters. Thanks, David > Hey David, > > Thanks for the KIP. Can you help with the following two questions: > > 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a > topic for debug/validation purpose, a randome consumer group may be created > and offset may be committed for this consumer group. If no offset commit is > made for this consumer group in the future, will this effectively > disable consumed log retention for this topic? In other words, how do this > KIP distinguish active consumer group from inactive ones? > > 2) Why do we need new configs such as log.retention.commitoffset.hours? Can >we simply delete log segments if consumed log retention is enabled for this > topic and all consumer groups have consumed messages in the log segment? > > Thanks, > Dong > > > >On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) wrote: > > > Hi Becket, > > > > Thanks for the feedback: > > 1. We use the simple consumer api to query the commit offset, so we don't > > need to specify the consumer group. > > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > > the commit offset in the log retention process. The client can commit > > offset or not. > > 3. It does not need to distinguish the follower brokers or leader > > brokers, every brokers can query. > > 4. We don't need to change the protocols, we mainly change the log > > retention process in the log manager. > > > > One question is the query min offset need O(partitions * groups) time > > complexity, another alternative is to build an internal topic to save every > > partition's min offset, it can reduce to O(1). > > I will update the wiki for more details. > > > > Thanks, > > David > > > > > > > Hi Pengwei, > > > > > > Thanks for the KIP proposal. It is a very useful KIP. At a high level, > > the > > > proposed behavior looks reasonable to me. > > > > > > However, it seems that some of the details are not mentioned in the KIP. > > > For example, > > > > > > 1. How will the expected consumer group be specified? Is it through a per > > > topic dynamic configuration? > > > 2. How do the brokers detect the consumer offsets? Is it required for a > > > consumer to commit offsets? > > > 3. How do all the replicas know the about the committed offsets? e.g. 1) > > > non-coordinator brokers which do not have the committed offsets, 2) > > > follower brokers which do not have consumers directly consuming from it. > > > 4. Is there any other changes need to be made (e.g. new protocols) in > > > addition to the configuration change? > > > > > > It would be great if you can update the wiki to have more details. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) > > wrote: > > > > > > > Hi All, > > > >I have made a KIP to enhance the log retention, details as follows: > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 68+Add+a+consumed+log+retention+before+log+retention > > > >Now start a discuss thread for this KIP , looking forward to the > > > > feedback. > > > > > > > > Thanks, > > > > David > > > > > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hey David, Thanks for updating the wiki. 1. I was actually thinking of letting every broker just consume the __consumer_offsets topic. But it seems less efficient if there are only a few topics configured for committed offsets based retention. So querying the committed offsets seems reasonable. From the wiki it is not clear whether the committed offset query happens sync or async. It is probably better to do this asynchronously, i.e. in another thread other than the log deleting thread. Otherwise querying the committed offsets may slow down or even potentially block the log deletion due to a remote call failure. 2. Using new consumer does not necessarily introduce a new group unless we use Kafka based group management. But using KafkaConsumer directly to query the committed offsets may not work in this case because by default it uses the consumer group in the ConsumerConfig. We can use NetworkClient and see if we can reuse some of the code in the new consumer. Since there has been a lot of efforts spent on deprecating the SimpleConsumer, we probably want to avoid introducing any new usage. Anyway, this is implementation detail and we can figure that out when writing the patch. 3. What I am thinking is that we want to consider whether we will allow multiple policies to be set at the same time? If we do allow that, which one of the policies will take precedence. Otherwise it might be confusing for the users if they have multiple retention policies set. In addition to the above, it seems that we need some way to configure the set of consumer groups a topic should be listening on? If it is through topic config, it would be good to document the configuration name and format of value in the wiki as well. Thanks, Jiangjie (Becket) Qin On Sun, Oct 9, 2016 at 7:14 AM, 东方甲乙 <254479...@qq.com> wrote: > Hi Becket, > This is david, thanks for the comments. I have update some info in > the wiki. All the changes is nearly described in the workflow. > Answer for the commnets: > 1. Every brokers only have some of the groups' commit offset which are > storaged in the __comsumer_offsets topics, it still have to query other > coordinator(other brokers) for some group's commit offset. > So we use the OffsetFetchRequest to query one group's commit offset. > > > 2. If using new consumer to query the commit offset will introduce new > group, but if we use the OffsetFetchRequest to query (like the > consumer-offset-checker tool, first find the coordinator and build an > channel to query), we will not introduce new group. > > > 3. I think the KIP-47's functionality seems a little different from this > KIP, though we are all modifying the log retention. > > > Thanks, > David. > > > > > > > > > ---------- 原始邮件 ---------- > 发件人: "Becket Qin";; > 发送时间: 2016年10月9日(星期天) 中午1:00 > 收件人: "dev"; > > 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention > > > > Hi David, > > Thanks for the explanation. Could you update the KIP-68 wiki to include the > changes that need to be made? > > I have a few more comments below: > > 1. We already have an internal topic __consumer_offsets to store all the > committed offsets. So the brokers can probably just consume from that to > get the committed offsets for all the partitions of each group. > > 2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer > instead of SimpleConsumer. It handles all the leader movements and > potential failures. > > 3. KIP-47 also has a proposal for a new time based log retention policy and > propose a new configuration on log retention. It may be worth thinking > about the behavior together. > > Thanks, > > Jiangjie (Becket) Qin > > On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) wrote: > > > Hi Becket, > > > > Thanks for the feedback: > > 1. We use the simple consumer api to query the commit offset, so we > don't > > need to specify the consumer group. > > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > > the commit offset in the log retention process. The client can commit > > offset or not. > > 3. It does not need to distinguish the follower brokers or leader > > brokers, every brokers can query. > > 4. We don't need to change the protocols, we mainly change the log > > retention process in the log manager. > > > > One question is the query min offset need O(partitions * groups) time > > complexity, another alternative is to build an internal topic to save > every > > partition's min offset, it can reduce to O(1). > > I will update the wiki for more details. > > > > Thanks, > > David > > >
RE?? [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Becket, This is david, thanks for the comments. I have update some info in the wiki. All the changes is nearly described in the workflow. Answer for the commnets: 1. Every brokers only have some of the groups' commit offset which are storaged in the __comsumer_offsets topics, it still have to query other coordinator(other brokers) for some group's commit offset. So we use the OffsetFetchRequest to query one group's commit offset. 2. If using new consumer to query the commit offset will introduce new group, but if we use the OffsetFetchRequest to query (like the consumer-offset-checker tool, first find the coordinator and build an channel to query), we will not introduce new group. 3. I think the KIP-47's functionality seems a little different from this KIP, though we are all modifying the log retention. Thanks, David. -- -- ??: "Becket Qin";; : 2016??10??9??(??) 1:00 ??: "dev"; ????: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention Hi David, Thanks for the explanation. Could you update the KIP-68 wiki to include the changes that need to be made? I have a few more comments below: 1. We already have an internal topic __consumer_offsets to store all the committed offsets. So the brokers can probably just consume from that to get the committed offsets for all the partitions of each group. 2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer instead of SimpleConsumer. It handles all the leader movements and potential failures. 3. KIP-47 also has a proposal for a new time based log retention policy and propose a new configuration on log retention. It may be worth thinking about the behavior together. Thanks, Jiangjie (Becket) Qin On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) wrote: > Hi Becket, > > Thanks for the feedback: > 1. We use the simple consumer api to query the commit offset, so we don't > need to specify the consumer group. > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > the commit offset in the log retention process. The client can commit > offset or not. > 3. It does not need to distinguish the follower brokers or leader > brokers, every brokers can query. > 4. We don't need to change the protocols, we mainly change the log > retention process in the log manager. > > One question is the query min offset need O(partitions * groups) time > complexity, another alternative is to build an internal topic to save every > partition's min offset, it can reduce to O(1). > I will update the wiki for more details. > > Thanks, > David > > > > Hi Pengwei, > > > > Thanks for the KIP proposal. It is a very useful KIP. At a high level, > the > > proposed behavior looks reasonable to me. > > > > However, it seems that some of the details are not mentioned in the KIP. > > For example, > > > > 1. How will the expected consumer group be specified? Is it through a per > > topic dynamic configuration? > > 2. How do the brokers detect the consumer offsets? Is it required for a > > consumer to commit offsets? > > 3. How do all the replicas know the about the committed offsets? e.g. 1) > > non-coordinator brokers which do not have the committed offsets, 2) > > follower brokers which do not have consumers directly consuming from it. > > 4. Is there any other changes need to be made (e.g. new protocols) in > > addition to the configuration change? > > > > It would be great if you can update the wiki to have more details. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) > wrote: > > > > > Hi All, > > >I have made a KIP to enhance the log retention, details as follows: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 68+Add+a+consumed+log+retention+before+log+retention > > >Now start a discuss thread for this KIP , looking forward to the > > > feedback. > > > > > > Thanks, > > > David > > > > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hey David, Thanks for the KIP. Can you help with the following two questions: 1) If someone start a consumer (e.g. kafka-console-consumer) to consume a topic for debug/validation purpose, a randome consumer group may be created and offset may be committed for this consumer group. If no offset commit is made for this consumer group in the future, will this effectively disable consumed log retention for this topic? In other words, how do this KIP distinguish active consumer group from inactive ones? 2) Why do we need new configs such as log.retention.commitoffset.hours? Can we simply delete log segments if consumed log retention is enabled for this topic and all consumer groups have consumed messages in the log segment? Thanks, Dong On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) wrote: > Hi Becket, > > Thanks for the feedback: > 1. We use the simple consumer api to query the commit offset, so we don't > need to specify the consumer group. > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > the commit offset in the log retention process. The client can commit > offset or not. > 3. It does not need to distinguish the follower brokers or leader > brokers, every brokers can query. > 4. We don't need to change the protocols, we mainly change the log > retention process in the log manager. > > One question is the query min offset need O(partitions * groups) time > complexity, another alternative is to build an internal topic to save every > partition's min offset, it can reduce to O(1). > I will update the wiki for more details. > > Thanks, > David > > > > Hi Pengwei, > > > > Thanks for the KIP proposal. It is a very useful KIP. At a high level, > the > > proposed behavior looks reasonable to me. > > > > However, it seems that some of the details are not mentioned in the KIP. > > For example, > > > > 1. How will the expected consumer group be specified? Is it through a per > > topic dynamic configuration? > > 2. How do the brokers detect the consumer offsets? Is it required for a > > consumer to commit offsets? > > 3. How do all the replicas know the about the committed offsets? e.g. 1) > > non-coordinator brokers which do not have the committed offsets, 2) > > follower brokers which do not have consumers directly consuming from it. > > 4. Is there any other changes need to be made (e.g. new protocols) in > > addition to the configuration change? > > > > It would be great if you can update the wiki to have more details. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) > wrote: > > > > > Hi All, > > >I have made a KIP to enhance the log retention, details as follows: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 68+Add+a+consumed+log+retention+before+log+retention > > >Now start a discuss thread for this KIP , looking forward to the > > > feedback. > > > > > > Thanks, > > > David > > > > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi David, Thanks for the explanation. Could you update the KIP-68 wiki to include the changes that need to be made? I have a few more comments below: 1. We already have an internal topic __consumer_offsets to store all the committed offsets. So the brokers can probably just consume from that to get the committed offsets for all the partitions of each group. 2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer instead of SimpleConsumer. It handles all the leader movements and potential failures. 3. KIP-47 also has a proposal for a new time based log retention policy and propose a new configuration on log retention. It may be worth thinking about the behavior together. Thanks, Jiangjie (Becket) Qin On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L) wrote: > Hi Becket, > > Thanks for the feedback: > 1. We use the simple consumer api to query the commit offset, so we don't > need to specify the consumer group. > 2. Every broker using the simple consumer api(OffsetFetchKey) to query > the commit offset in the log retention process. The client can commit > offset or not. > 3. It does not need to distinguish the follower brokers or leader > brokers, every brokers can query. > 4. We don't need to change the protocols, we mainly change the log > retention process in the log manager. > > One question is the query min offset need O(partitions * groups) time > complexity, another alternative is to build an internal topic to save every > partition's min offset, it can reduce to O(1). > I will update the wiki for more details. > > Thanks, > David > > > > Hi Pengwei, > > > > Thanks for the KIP proposal. It is a very useful KIP. At a high level, > the > > proposed behavior looks reasonable to me. > > > > However, it seems that some of the details are not mentioned in the KIP. > > For example, > > > > 1. How will the expected consumer group be specified? Is it through a per > > topic dynamic configuration? > > 2. How do the brokers detect the consumer offsets? Is it required for a > > consumer to commit offsets? > > 3. How do all the replicas know the about the committed offsets? e.g. 1) > > non-coordinator brokers which do not have the committed offsets, 2) > > follower brokers which do not have consumers directly consuming from it. > > 4. Is there any other changes need to be made (e.g. new protocols) in > > addition to the configuration change? > > > > It would be great if you can update the wiki to have more details. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) > wrote: > > > > > Hi All, > > >I have made a KIP to enhance the log retention, details as follows: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 68+Add+a+consumed+log+retention+before+log+retention > > >Now start a discuss thread for this KIP , looking forward to the > > > feedback. > > > > > > Thanks, > > > David > > > > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Becket, Thanks for the feedback: 1. We use the simple consumer api to query the commit offset, so we don't need to specify the consumer group. 2. Every broker using the simple consumer api(OffsetFetchKey) to query the commit offset in the log retention process. The client can commit offset or not. 3. It does not need to distinguish the follower brokers or leader brokers, every brokers can query. 4. We don't need to change the protocols, we mainly change the log retention process in the log manager. One question is the query min offset need O(partitions * groups) time complexity, another alternative is to build an internal topic to save every partition's min offset, it can reduce to O(1). I will update the wiki for more details. Thanks, David > Hi Pengwei, > > Thanks for the KIP proposal. It is a very useful KIP. At a high level, the > proposed behavior looks reasonable to me. > > However, it seems that some of the details are not mentioned in the KIP. > For example, > > 1. How will the expected consumer group be specified? Is it through a per > topic dynamic configuration? > 2. How do the brokers detect the consumer offsets? Is it required for a > consumer to commit offsets? > 3. How do all the replicas know the about the committed offsets? e.g. 1) > non-coordinator brokers which do not have the committed offsets, 2) > follower brokers which do not have consumers directly consuming from it. > 4. Is there any other changes need to be made (e.g. new protocols) in > addition to the configuration change? > > It would be great if you can update the wiki to have more details. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) wrote: > > > Hi All, > >I have made a KIP to enhance the log retention, details as follows: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 68+Add+a+consumed+log+retention+before+log+retention > >Now start a discuss thread for this KIP , looking forward to the > > feedback. > > > > Thanks, > > David > > > >
Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi Pengwei, Thanks for the KIP proposal. It is a very useful KIP. At a high level, the proposed behavior looks reasonable to me. However, it seems that some of the details are not mentioned in the KIP. For example, 1. How will the expected consumer group be specified? Is it through a per topic dynamic configuration? 2. How do the brokers detect the consumer offsets? Is it required for a consumer to commit offsets? 3. How do all the replicas know the about the committed offsets? e.g. 1) non-coordinator brokers which do not have the committed offsets, 2) follower brokers which do not have consumers directly consuming from it. 4. Is there any other changes need to be made (e.g. new protocols) in addition to the configuration change? It would be great if you can update the wiki to have more details. Thanks, Jiangjie (Becket) Qin On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) wrote: > Hi All, >I have made a KIP to enhance the log retention, details as follows: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 68+Add+a+consumed+log+retention+before+log+retention >Now start a discuss thread for this KIP , looking forward to the > feedback. > > Thanks, > David > >
[DISCUSS] KIP-68 Add a consumed log retention before log retention
Hi All, I have made a KIP to enhance the log retention, details as follows: https://cwiki.apache.org/confluence/display/KAFKA/KIP-68+Add+a+consumed+log+retention+before+log+retention Now start a discuss thread for this KIP , looking forward to the feedback. Thanks, David