Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-05-27 Thread hudeqi
Thank you for your attention and reply. Here are my reply to your questions:

1. If strategy=safe_latest and there is not committed offset, whether the group 
is newly started is determined in this way: when the group is started, a 
timestamp "createTimeStamp" will be passed as the start time of the group. When 
the offset needs to be reset, the timestamp will be added to 
"ListOffsetsRequest" as a new field. The server compares the timestamp 
"createTimeStamp" with the timestamp "logStartTime", which is the first message 
time for each partition. If "createTimeStamp" "greater than "logStartTime" 
means that the group is newly started for this partition and consumed from the 
latest, otherwise it means that the partition is newly expanded and needs to be 
consumed from the earliest. For details, you can see related jira and pr.



2. Strictly speaking, nearest is not a level strategy with 
earlyliest/latest/safe_latest/earliest_on_start/latest_on_start. It is more 
like an auxiliary strategy, which is only dealt with out-of-range. So if you 
set nearest.offset.reset=true, no matter what strategy "auto.offset.reset" is 
set to, it will be performed according to the strategy of nearest when 
out-of-range (to the earliest if it was under the range , or to the latest if 
it was over the range), and for the case where no committed offset, nearest 
will naturally have no effect, instead, it is determined by the main 
strategy(auto.offset.reset).




3. This I have added to the form of module "Proposed Changes" in kip-842.




4. The meaning of nearest.offset.reset has been clearly expressed in point 2, 
this configuration is disabled default, that is to say, when out-of-range, 
reset strategy is performed according to the main strategy (auto.offset.reset).



> -原始邮件-
> 发件人: "deng ziming" 
> 发送时间: 2022-05-27 16:02:53 (星期五)
> 收件人: dev@kafka.apache.org
> 抄送: 
> 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> 
> Thank you for this KIP, the motivation makes sense to me, left some questions:
> 
> 1. If strategy=safe_latest and there is not committed offset, we have 2 
> choices based on whether the group is started newly, can you elaborate on how 
> can we decide the group is started newly? It would be clear.
> 
> 2. If strategy=nearest and there is not committed offset, its behavior is 
> determined by the earliest, or latest, or safe_latest used together. can you 
> elaborate on it more clearly?
> 
> 3. Can you also add a column "current reset behavior” and change "reset 
> behavior” to “proposed reset behavior”, then we can be clear that this has no 
> effect on current behavior.
> 
> 4. You added a new config “nearest.offset.reset” and only explain what will 
> happen when we set it true, you’d better explain what will happen it it is 
> false
> 
> --
> Best,
> Ziming
> 
> 
> > On May 26, 2022, at 10:54 AM, 胡德祺 <16120...@bjtu.edu.cn> wrote:
> > 
> > Hi all,
> > Why is no one talking about this?
> > best
> > hudeqi
> > 
> > 2022-05-23 17:45:53"胡德祺" <16120...@bjtu.edu.cn>写道:
> > 
> > Hi all,
> > 
> > I have written a new KIPto add some group offset reset mechanisms. Please 
> > take a look here: https://cwiki.apache.org/confluence/x/xhyhD
> > 
> > besthudeqi
>




Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-05-30 Thread hudeqi
Thank you for your reply.
According to the current implementation in pull request, it may not be possible 
to directly remove the enumeration value of nearest. However, on the whole, 
putting nearest in the OffsetResetStrategy enumeration class may cause some 
misunderstandings in use. There are two solutions. One is to make a defensive 
check on the value of "auto.offset.reset" when initializing the consumer, and 
the other one is that I change the implementation. For example, I add an 
additional variable named "auxiliaryStrategy" to the SubscriptionState class to 
reset offset when triggering out-of-range.
Besides this problem, is there any other problem? What can I do next to push 
this kip up for adoption? This is my first time I do, I don't understand very 
well.

> -原始邮件-
> 发件人: "deng ziming" 
> 发送时间: 2022-05-30 13:23:53 (星期一)
> 收件人: dev@kafka.apache.org
> 抄送: 
> 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> 
> Thank you for your reply.
> 
> According to your description, strategy=nearest is redundant, right? We 
only rely on nearest.offset.reset=true/false to handle OffsetOutOfRange, I 
think we can directly remove this enum value, WDYT?
> 
> --
> Best,
> Ziming
> 
> > On May 27, 2022, at 5:19 PM, hudeqi <16120...@bjtu.edu.cn> 
wrote:
> > 
> > Thank you for your attention and reply. Here are my reply to your 
questions:
> > 
> > 1. If strategy=safe_latest and there is not committed offset, whether 
the group is newly started is determined in this way: when the group is 
started, a timestamp "createTimeStamp" will be passed as the start time of the 
group. When the offset needs to be reset, the timestamp will be added to 
"ListOffsetsRequest" as a new field. The server compares the timestamp 
"createTimeStamp" with the timestamp "logStartTime", which is the first message 
time for each partition. If "createTimeStamp" "greater than "logStartTime" 
means that the group is newly started for this partition and consumed from the 
latest, otherwise it means that the partition is newly expanded and needs to be 
consumed from the earliest. For details, you can see related jira and pr.
> > 
> > 
> > 
> > 2. Strictly speaking, nearest is not a level strategy with 
earlyliest/latest/safe_latest/earliest_on_start/latest_on_start. It is more 
like an auxiliary strategy, which is only dealt with out-of-range. So if you 
set nearest.offset.reset=true, no matter what strategy "auto.offset.reset" is 
set to, it will be performed according to the strategy of nearest when 
out-of-range (to the earliest if it was under the range , or to the latest if 
it was over the range), and for the case where no committed offset, nearest 
will naturally have no effect, instead, it is determined by the main 
strategy(auto.offset.reset).
> > 
> > 
> > 
> > 
> > 3. This I have added to the form of module "Proposed Changes" in 
kip-842.
> > 
> > 
> > 
> > 
> > 4. The meaning of nearest.offset.reset has been clearly expressed in 
point 2, this configuration is disabled default, that is to say, when 
out-of-range, reset strategy is performed according to the main strategy 
(auto.offset.reset).
> > 
> > 
> > 
> >> -原始邮件-
> >> 发件人: "deng ziming" 
> >> 发送时间: 2022-05-27 16:02:53 (星期五)
> >> 收件人: dev@kafka.apache.org
> >> 抄送: 
> >> 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset 
mechanisms
> >> 
> >> Thank you for this KIP, the motivation makes sense to me, left 
some questions:
> >> 
> >> 1. If strategy=safe_latest and there is not committed offset, we 
have 2 choices based on whether the group is started newly, can you elaborate 
on how can we decide the group is started newly? It would be clear.
> >> 
> >> 2. If strategy=nearest and there is not committed offset, its 
behavior is determined by the earliest, or latest, or safe_latest used 
together. can you elaborate on it more clearly?
> >> 
> >> 3. Can you also add a column "current reset behavior” and change 
"reset behavior” to “proposed reset behavior”, then we can be clear that this 
has no effect on current behavior.
> >> 
> >> 4. You added a new config “nearest.offset.reset” and only explain 
what will happen when we set it true, you’d better explain what will happen it 
it is false
> >> 
> >> --
> >> Best,
> >> Ziming
> >> 
> >> 
> >>> On May 26, 2022, at 10:54 AM, 胡德祺 
<16120...@bjtu.edu.cn> wrote:
> >>> 
> >>> Hi all,
> >>> Why is no one talking about this?
> >>> best
> >>> hudeqi
> >>> 
> >>> 2022-05-23 17:45:53"胡德祺" <16120...@bjtu.edu.cn>写道:
> >>> 
> >>> Hi all,
> >>> 
> >>> I have written a new KIPto add some group offset reset 
mechanisms. Please take a look here: https://cwiki.apache.org/confluence/x/xhyhD
> >>> 
> >>> besthudeqi
> >> 
> > 
> >


--
Best,
hudeqi


Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-06-07 Thread hudeqi
I think so too, what about Guozhang Wang and Luke Chen? Can I initiate a voting 
process?

Best,
hudeqi

> -原始邮件-
> 发件人: "邓子明" 
> 发送时间: 2022-06-07 10:23:37 (星期二)
> 收件人: dev@kafka.apache.org
> 抄送: 
> 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> 


Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-07-01 Thread hudeqi
Thanks for your attention and reply.
If it is put together with "latest", the "safe_latest" does look a bit strange, 
which may make users not know which one to choose. In essence, "safe_latest" is 
to solve the situation that data may be lost after extending partition, so I 
think it is better to remove "safe_latest" and move this fixed logic to 
"latest".
In addition, I think that before this KIP was proposed, many people did not 
know this case, and it is even less possible to deal with it in application. If 
kafka can handle it by itself, that would be best.
As for nearest, I think it is more complicated than above case and little 
benefit. This kind of auxiliary strategy for out-of-range can also be removed 
If it is really hard to digested.
The "latest/earliest_on_start" is indeed mentioned by KAFKA-3370. I think it is 
more useful, so I put it together and put it into this KIP.

Best,
hudeqi

"Matthias J. Sax" 写道:
> Thanks for the KIP.
> 
> I don't think I fully digested the proposal yet, but my first reaction 
> is: this is quite complicated. Frankly, I am worried about complexity 
> and usability.
> 
> Especially the option `safe_latest` is a "weird" one IMHO, and `nearest` 
> is even more complex.
> 
> The problem at hand (as I understand it from the Jira) is a real one, 
> but I am wondering if it would be something that should be addressed by 
> the application? If you pass in strategy `none`, and a new partition is 
> added, you can react to it by custom code. For regular startup you can 
> still go with "latest" to avoid reprocessing the history.
> 
> Adding "latest/earliest_on_start" seems useful, as it seems to also 
> address https://issues.apache.org/jira/browse/KAFKA-3370
> 
> 
> -Matthias
> 
> 
> On 6/7/22 12:55 AM, hudeqi wrote:
> > I think so too, what about Guozhang Wang and Luke Chen? Can I initiate a 
> > voting process?
> > 
> > Best,
> > hudeqi
> > 
> > > -原始邮件-
> > > 发件人: "邓子明" 
> > > 发送时间: 2022-06-07 10:23:37 (星期二)
> > > 收件人: dev@kafka.apache.org
> > > 抄送:
> > > 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> > >
> > 


Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-07-01 Thread hudeqi
Thanks for your attention and reply.
Regarding the problem raised by this kip, if you have other ideas or solutions, 
you are welcome to put forward them, thank you.

Best,
hudeqi

"David Jacot" 写道:
> Thanks for the KIP.
> 
> I read it and I am also worried by the complexity of the new
> configurations. They are not easy to grasp. I need to digest it a bit more,
> I think.
> 
> Best,
> David
> 
> Le mer. 29 juin 2022 à 02:25, Matthias J. Sax  a écrit :
> 
> > Thanks for the KIP.
> >
> > I don't think I fully digested the proposal yet, but my first reaction
> > is: this is quite complicated. Frankly, I am worried about complexity
> > and usability.
> >
> > Especially the option `safe_latest` is a "weird" one IMHO, and `nearest`
> > is even more complex.
> >
> > The problem at hand (as I understand it from the Jira) is a real one,
> > but I am wondering if it would be something that should be addressed by
> > the application? If you pass in strategy `none`, and a new partition is
> > added, you can react to it by custom code. For regular startup you can
> > still go with "latest" to avoid reprocessing the history.
> >
> > Adding "latest/earliest_on_start" seems useful, as it seems to also
> > address https://issues.apache.org/jira/browse/KAFKA-3370
> >
> >
> > -Matthias
> >
> >
> > On 6/7/22 12:55 AM, hudeqi wrote:
> > > I think so too, what about Guozhang Wang and Luke Chen? Can I initiate a
> > voting process?
> > >
> > > Best,
> > > hudeqi
> > >
> > > > -原始邮件-
> > > > 发件人: "邓子明" 
> > > > 发送时间: 2022-06-07 10:23:37 (星期二)
> > > > 收件人: dev@kafka.apache.org
> > > > 抄送:
> > > > 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> > > >
> > > 
> >


Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-07-01 Thread hudeqi
Thanks for your attention and reply.
Having chatted with Guozhang Wang at KAFKA-12478 before, I came up with an idea 
similar to yours. It's just not implemented on the client side, but on the 
server side: Firstly, find out all the groups subscribed to this topic before 
extending partitions, and then let these groups commit an initial offset 0 for 
these new expanded partitions (also using adminClient). Finally, the real 
process of adding partitions is carried out. In this way, the problem can also 
be completely solved.

Best,
hudeqi

"Matthew Howlett" 写道:
> My first reaction also is that the proposed configuration is surely too
> complicated.
> 
> It seems like an ideal solution from a usability perspective (always a good
> place to start) would be if the consumer just automatically behaved in this
> way. To make that work:
> 1. auto.offset.reset=latest would need to behave like
> auto.offset.reset=earliest in the case where a consumer is in a group, and
> is assigned a newly created partition. This might seem a bit too "magic",
> but from the perspective of the group, I think it makes conceptual sense
> and people wouldn't find it surprising. Also, I don't think anyone would be
> relying on the old behavior.
> 2. The group would need to detect the newly created partitions and
> rebalance pretty quickly (this is not the case currently). The longer the
> delay, the more tenuous the idea of changing the auto.offset.reset behavior
> in this special circumstance.
> 
> I have a feeling this approach has implementation challenges (haven't
> thought deeply), just throwing it out there.
> 
> 
> On Wed, Jun 29, 2022 at 4:57 AM David Jacot  wrote:
> 
> > Thanks for the KIP.
> >
> > I read it and I am also worried by the complexity of the new
> > configurations. They are not easy to grasp. I need to digest it a bit more,
> > I think.
> >
> > Best,
> > David
> >
> > Le mer. 29 juin 2022 à 02:25, Matthias J. Sax  a écrit :
> >
> > > Thanks for the KIP.
> > >
> > > I don't think I fully digested the proposal yet, but my first reaction
> > > is: this is quite complicated. Frankly, I am worried about complexity
> > > and usability.
> > >
> > > Especially the option `safe_latest` is a "weird" one IMHO, and `nearest`
> > > is even more complex.
> > >
> > > The problem at hand (as I understand it from the Jira) is a real one,
> > > but I am wondering if it would be something that should be addressed by
> > > the application? If you pass in strategy `none`, and a new partition is
> > > added, you can react to it by custom code. For regular startup you can
> > > still go with "latest" to avoid reprocessing the history.
> > >
> > > Adding "latest/earliest_on_start" seems useful, as it seems to also
> > > address https://issues.apache.org/jira/browse/KAFKA-3370
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/7/22 12:55 AM, hudeqi wrote:
> > > > I think so too, what about Guozhang Wang and Luke Chen? Can I initiate
> > a
> > > voting process?
> > > >
> > > > Best,
> > > > hudeqi
> > > >
> > > > > -原始邮件-
> > > > > 发件人: "邓子明" 
> > > > > 发送时间: 2022-06-07 10:23:37 (星期二)
> > > > > 收件人: dev@kafka.apache.org
> > > > > 抄送:
> > > > > 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset
> > mechanisms
> > > > >
> > > > 
> > >
> >


Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-07-08 Thread hudeqi
Regarding the option to integrate repair logic in "latest", I understand your 
concern about this approach: backward compatibility.
But we should have a consensus: the problem of data loss due to expand 
partitions is indeed caused by kafka's own design mechanism. The user 
configuration "latest" may be due to the consideration of not wanting to 
consume from earliest when firstly deploy app, or too much lag, or consumption 
exceeds the maximum offset, and then consume directly from the latest. As for 
expanding partition, the user will definitely not want to consume from the 
latest, unless he clearly knows what this means. Therefore, it is necessary to 
solve this problem, at the same time, without causing other problems.
Therefore, for the method of adding an "init.offset.reset" option, there will 
be a problem, that is, this configuration must be set to "earliest" to avoid 
this situation, but it will also cause the new group to be consumed from 
earliest. , which goes against the idea of ​​consuming from the latest at the 
beginning (brings other problems).
The same is true for the method of setting auto.offset.reset to "earliest" and 
seekingToEnd on new deployments: in order to avoid this case, 
"auto.offset.reset" has no choice but to set "earliest", when the consumption 
is advanced, it will also reset to the earliest, causing duplication (bringing 
other problems).
So I think it's best to fix it in a black box to fundamentally solve it. It 
does not require users to perceive this problem, nor does the user's 
understanding of "auto.offset.reset" need to be changed, and there will be no 
complexity caused by redundant parameter configuration(and users doesn't 
necessarily know how to combine these parameters to use it). As for the 
compatibility issue, I think it is enough to enrich the test cases after the 
repair, what do you think?

"Matthias J. Sax" 写道:
> I am not sure if we can/should change the behavior of existing 
> "latest/earliest" due to backward compatibility concerns. While I agree 
> that many users might not know the fine details how both behave, it 
> would still be a change that might break other people that do understand 
> the details and rely on it.
> 
> I also agree that having both "latest" and "safe_latest" might be 
> difficult, as users might not know which one to choose?
> 
> Maybe we should have two configs instead of one? `auto.offset.reset`, as 
> the name suggests, resets the offset automatically, and thus it's 
> current behavior is actually well defined and sound. -- What seems to be 
> missing is an `init.offset` config that is only used if there is _no_ 
> committed offsets, but it's not used when the consumer has a position 
> already (either via getting a committed offset or via seek())?
> 
> 
> For the original use-case you mentioned, that you want to start from 
> "latest" when the app starts, but if a new partition is added you want 
> to start from "earliest" it seem that the right approach would be to 
> actually configure "earliest", and when the app is deployed for the 
> first time, use a `seekToEnd()` to avoid triggering auto-offset-reset?
> 
> 
> Thoughts?
> 
> 
> -Matthias
> 
> 
> On 7/1/22 6:03 AM, hudeqi wrote:
> > Thanks for your attention and reply.
> > Having chatted with Guozhang Wang at KAFKA-12478 before, I came up with an 
> > idea similar to yours. It's just not implemented on the client side, but on 
> > the server side: Firstly, find out all the groups subscribed to this topic 
> > before extending partitions, and then let these groups commit an initial 
> > offset 0 for these new expanded partitions (also using adminClient). 
> > Finally, the real process of adding partitions is carried out. In this way, 
> > the problem can also be completely solved.
> > 
> > Best,
> > hudeqi
> > 
> > "Matthew Howlett" 写道:
> >> My first reaction also is that the proposed configuration is surely too
> >> complicated.
> >>
> >> It seems like an ideal solution from a usability perspective (always a good
> >> place to start) would be if the consumer just automatically behaved in this
> >> way. To make that work:
> >> 1. auto.offset.reset=latest would need to behave like
> >> auto.offset.reset=earliest in the case where a consumer is in a group, and
> >> is assigned a newly created partition. This might seem a bit too "magic",
> >> but from the perspective of the group, I think it makes conceptual sense
> >> and people wouldn't find it surprising. Also, I don't think anyone would be
> >> relying on the old behavior.
> >> 2. The group would need to detect the newly created partitions and
> >> rebalance pretty quickly (this is not the case currently). The longer the
> >> delay, the more tenuous the idea of changing the auto.offset.reset behavior
> >> in this special circumstance.
> >>
> >> I have a feeling this approach has implementation challenges (haven't
> >> thought deeply), just throwing it out there.
> >>
> >>

Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-07-13 Thread Guozhang Wang
Hello Deqi,

Thanks for bringing this KIP, and sorry for getting back to you so late.

I do think that separating the reset policy for the two scenarios: 1) when
we get an out-of-range when polling records, likely due to log being
truncated, 2) when we start fetching and there's no committed offset, would
be preferable as we have seen many places where people would prefer to use
different strategies for these two scenarios. At the same time, I also
share other's concerns that the current proposal is trying to mingle too
many features together which makes it unnecessarily complicated and also
makes the compatibility story trickier.

My readings are that, you want to achieve the following things within this
KIP:

a) separate the two scenarios for reset strategies, as I mentioned above.
This to me is the most compelling motivation.
b) introduce new reset policies, a.k.a."nearest"in addition to earliest and
latest. This has been proposed quite a while ago to add more flexibilities
in the policies.
c) tailor the reset policy of a topic for a specific consumer group. I.e.
when a consumer group starts consuming, we want to let it start from
"latest", but once the consumer group starts, newly added partitions would
be using "earliest" instead to avoid data loss.

I think trying to compound all these things in this one KIP makes it a bit
too mingled, and complicated. Plus, we also need to make sure that we are
compatible with the old behaviors if users only set "earliest" or "latest",
and expect that to impact both scenarios.

I think about them for a bit and here's my 2c: how about, we simplify this
KIP in the following way. The first three rows are existing strategies that
we do not change for compatibility.

--

*auto.offset.resetwhen out-of-range
 when no committed offsets found*
none throw exception
 throw exception
earliest reset to earliest
reset to earliest
latestreset to latest
   reset to latest

*auto.offset.reset.on.start*
earliest_on_start  throw exception
reset to earliest
latest_on_start throw exception
  reset to latest




















On Fri, Jul 8, 2022 at 7:01 AM hudeqi <16120...@bjtu.edu.cn> wrote:

> Regarding the option to integrate repair logic in "latest", I understand
> your concern about this approach: backward compatibility.
> But we should have a consensus: the problem of data loss due to expand
> partitions is indeed caused by kafka's own design mechanism. The user
> configuration "latest" may be due to the consideration of not wanting to
> consume from earliest when firstly deploy app, or too much lag, or
> consumption exceeds the maximum offset, and then consume directly from the
> latest. As for expanding partition, the user will definitely not want to
> consume from the latest, unless he clearly knows what this means.
> Therefore, it is necessary to solve this problem, at the same time, without
> causing other problems.
> Therefore, for the method of adding an "init.offset.reset" option, there
> will be a problem, that is, this configuration must be set to "earliest" to
> avoid this situation, but it will also cause the new group to be consumed
> from earliest. , which goes against the idea of ​​consuming from the latest
> at the beginning (brings other problems).
> The same is true for the method of setting auto.offset.reset to "earliest"
> and seekingToEnd on new deployments: in order to avoid this case,
> "auto.offset.reset" has no choice but to set "earliest", when the
> consumption is advanced, it will also reset to the earliest, causing
> duplication (bringing other problems).
> So I think it's best to fix it in a black box to fundamentally solve it.
> It does not require users to perceive this problem, nor does the user's
> understanding of "auto.offset.reset" need to be changed, and there will be
> no complexity caused by redundant parameter configuration(and users doesn't
> necessarily know how to combine these parameters to use it). As for the
> compatibility issue, I think it is enough to enrich the test cases after
> the repair, what do you think?
>
> "Matthias J. Sax" 写道:
> > I am not sure if we can/should change the behavior of existing
> > "latest/earliest" due to backward compatibility concerns. While I agree
> > that many users might not know the fine details how both behave, it
> > would still be a change that might break other people that do understand
> > the details and rely on it.
> >
> > I also agree that having both "latest" and "safe_latest" might be
> > difficult, as users might not know which one to choose?
> >
> > Maybe we should have two configs instead of one? `auto.offset.reset`, as
> > the name suggests, resets the offset automatically, and thus it's
> > current behavior is actu

Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-07-13 Thread Guozhang Wang
- clicked "send" by mistake... here's the full email -

Hello Deqi,

Thanks for bringing this KIP, and sorry for getting back to you so late.

I do think that separating the reset policy for the two scenarios: 1) when
we get an out-of-range when polling records, likely due to log being
truncated, 2) when we start fetching and there's no committed offset, would
be preferable as we have seen many places where people would prefer to use
different strategies for these two scenarios. At the same time, I also
share other's concerns that the current proposal is trying to mingle too
many features together which makes it unnecessarily complicated and also
makes the compatibility story trickier.

My readings are that, you want to achieve the following things within this
KIP:

a) separate the two scenarios for reset strategies, as I mentioned above.
This to me is the most compelling motivation.
b) introduce new reset policies, a.k.a."nearest" in addition to earliest
and latest. This has been proposed quite a while ago to add more
flexibilities in the policies.
c) tailor the reset policy of a topic for a specific consumer group. I.e.
when a consumer group starts consuming, we want to let it start from
"latest", but once the consumer group starts, newly added partitions would
be using "earliest" instead to avoid data loss.

I think trying to compound all these things in this one KIP makes it a bit
too mingled, and complicated. Plus, we also need to make sure that we are
compatible with the old behaviors if users only set "earliest" or "latest",
and expect that to impact both scenarios.

I think about them for a bit and here's my 2c: how about, we simplify this
KIP in the following way. The first three rows are existing strategies that
we do not change for compatibility.

--

*auto.offset.resetwhen out-of-range
 when no committed offsets found*
none throw exception
 throw exception
earliest reset to earliest
reset to earliest
latestreset to latest
   reset to latest

*auto.offset.reset.on.no.initial.offset*
none fall back to
*auto.offset.reset* throw exception
earliest fall back to
*auto.offset.reset* reset to earliest
latestfall back to
*auto.offset.reset* reset to latest
latest_on_start fall back to *auto.offset.reset*
  reset to latest when the consumer group is starting (implementation wise,
we do not rely on timestamps, just check if this is the first time the
consumer get assignment); otherwise fall back to *auto.offset.reset*
earliest_on_start  fall back to *auto.offset.reset*
reset to earliest when the consumer group is starting (same as above);
otherwise fall back to *auto.offset.reset*

*auto.offset.reset.on.invalid.offset*
none throw exception
  fall back to *auto.offset.reset*
earliest reset to earliest
 fall back to *auto.offset.reset*
latestreset to latest
fall back to *auto.offset.reset*
nearestreset to latest if the current
offset is larger than log.end;
 to earliest if the
current offset is smaller than log.start  fall
back to *auto.offset.reset*

--

With this slightly modified proposal, we can still cover all three
motivations, e.g.:

a) "I want to use a different reset policy for out-of-range, and when no
committed offsets upon starting": auto.offset.reset = latest,
auto.offset.reset.on.invalid.offset = none.
b) "I want to use a flexible reset policy for out-of-range":
auto.offset.reset = latest, auto.offset.reset.on.invalid.offset = nearest.
b) "I want to not lose data upon new partitions after my consumer has
started a flexible reset policy for out-of-range": auto.offset.reset =
earliest, auto.offset.reset.on.no.initial.offset = latest_on_start.


Please let me know what you think.




On Wed, Jul 13, 2022 at 10:30 AM Guozhang Wang  wrote:

> Hello Deqi,
>
> Thanks for bringing this KIP, and sorry for getting back to you so late.
>
> I do think that separating the reset policy for the two scenarios: 1) when
> we get an out-of-range when polling records, likely due to log being
> truncated, 2) when we start fetching and there's no committed offset, would
> be preferable as we have seen many places where people would prefer to use
> different strategies for these two scenarios. At the same time, I also
> share other's concerns that the current proposal is trying to mingle too
> many features together which makes it unnece

Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2023-03-08 Thread hudeqi
Hello, have any mates who have discussed it before seen it? Also welcome new 
mates to discuss together.

"hudeqi" <16120...@bjtu.edu.cn>写道:
> Long time no see, this issue has been discussed for a long time, now please 
> allow me to summarize this issue, and then everyone can help to see which 
> direction this issue should go in?
> 
> There are two problems to be solved by this kip:
> 1. Solve the problem that when the client configures the "auto.offset.reset" 
> to latest, the new partition data may be lost when the consumer resets the 
> offset to the latest after expanding the topic partition.
> 
> 2. In addition to the "earliest", "latest", and "none" provided by the 
> existing "auto.offset.reset", it also provides more abundant parameters, such 
> as "latest_on_start" (application startup is reset to latest, and an 
> exception is thrown if out of range occurs), "earliest_on_start" (application 
> startup is reset to earliest, and an exception is thrown if out of range 
> occurs), "nearest"(determined by "auto.offset.reset" when the program starts, 
> and choose earliest or latest according to the distance between the current 
> offset and log start offset and log end offset when out of range occurs).
> 
> According to the discussion results of the members above, it seems that there 
> are concerns about adding these additional offset reset mechanisms: 
> complexity and compatibility. In fact, these parameters do have corresponding 
> benefits. Therefore, based on the above discussion results, I have sorted out 
> two solution directions. You can help me to see which direction to follow:
> 
> 1. The first one is to follow Guozhang's suggestion: keep the three 
> parameters of "auto.offset.reset" and their meanings unchanged, reduce the 
> confusion for Kafka users, and solve the compatibility problem by the way. 
> Add these two parameters:
> a. "auto.offset.reset.on.no.initial.offse": Indicates the strategy used 
> to initialize the offset. The default value is the parameter configured by 
> "auto.offset.reset". If so, the strategy for initializing the offset remains 
> unchanged from the previous behavior, ensuring compatibility. If the 
> parameter is configured with "latest_on_start" or "earliest_on_start", then 
> the offset will be reset according to the configured semantics when 
> initializing the offset. In this way, the problem of data loss during 
> partition expansion can be solved: configure 
> "auto.offset.reset.on.no.initial.offset" to "latest_on_start", and configure 
> "auto.offset.reset" to earliest.
> b. "auto.offset.reset.on.invalid.offset": Indicates that the offset is 
> illegal or out of range occurs. The default value is the parameter configured 
> by "auto.offset.reset". If so, the processing of out of range is the same as 
> before to ensure compatibility. If "nearest" is configured, then the semantic 
> logic corresponding to "nearest" is used only for the case of out of range.
> 
> This solution ensures compatibility and ensures that the semantics of the 
> original configuration remain unchanged. Only two incremental configurations 
> are added to flexibly handle different situations.
> 
> 2. The second is to directly reduce the complexity of this problem, and 
> directly add the logic of resetting the initial offset of the newly expanded 
> partition to the earliest to "auto.offset.reset"="latest". In this way, Kafka 
> users do not need to perceive this subtle but useful change, and the 
> processing of other situations remains unchanged (without considering too 
> many rich offset processing mechanisms).
> 
> I hope you can help me with the direction of the solution to this issue, 
> thank you.
> 
> Best,
> hudeqi


Re: Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2022-05-30 Thread hudeqi
Hi, Ziming.
I thought about it again and thought it might be better to add an additional 
auxiliaryStrategy, so that we can implement more auxiliary strategies in this 
way, not just nearest. What do you think?
Best,
hudeqi


> -原始邮件-
> 发件人: hudeqi <16120...@bjtu.edu.cn>
> 发送时间: 2022-05-30 15:33:45 (星期一)
> 收件人: dev@kafka.apache.org
> 抄送: 
> 主题: Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> 
> Thank you for your reply.
> According to the current implementation in pull request, it may not be 
possible to directly remove the enumeration value of nearest. However, on the 
whole, putting nearest in the OffsetResetStrategy enumeration class may cause 
some misunderstandings in use. There are two solutions. One is to make a 
defensive check on the value of "auto.offset.reset" when initializing the 
consumer, and the other one is that I change the implementation. For example, I 
add an additional variable named "auxiliaryStrategy" to the SubscriptionState 
class to reset offset when triggering out-of-range.
> Besides this problem, is there any other problem? What can I do next to 
push this kip up for adoption? This is my first time I do, I don't understand 
very well.
> 
> > -原始邮件-
> > 发件人: "deng ziming" 
> > 发送时间: 2022-05-30 13:23:53 (星期一)
> > 收件人: dev@kafka.apache.org
> > 抄送: 
> > 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms
> > 
> > Thank you for your reply.
> > 
> > According to your description, strategy=nearest is redundant, right? 
We only rely on nearest.offset.reset=true/false to handle OffsetOutOfRange, I 
think we can directly remove this enum value, WDYT?
> > 
> > --
> > Best,
> > Ziming
> > 
> > > On May 27, 2022, at 5:19 PM, hudeqi <16120...@bjtu.edu.cn> 
wrote:
> > > 
> > > Thank you for your attention and reply. Here are my reply to 
your questions:
> > > 
> > > 1. If strategy=safe_latest and there is not committed offset, 
whether the group is newly started is determined in this way: when the group is 
started, a timestamp "createTimeStamp" will be passed as the start time of the 
group. When the offset needs to be reset, the timestamp will be added to 
"ListOffsetsRequest" as a new field. The server compares the timestamp 
"createTimeStamp" with the timestamp "logStartTime", which is the first message 
time for each partition. If "createTimeStamp" "greater than "logStartTime" 
means that the group is newly started for this partition and consumed from the 
latest, otherwise it means that the partition is newly expanded and needs to be 
consumed from the earliest. For details, you can see related jira and pr.
> > > 
> > > 
> > > 
> > > 2. Strictly speaking, nearest is not a level strategy with 
earlyliest/latest/safe_latest/earliest_on_start/latest_on_start. It is more 
like an auxiliary strategy, which is only dealt with out-of-range. So if you 
set nearest.offset.reset=true, no matter what strategy "auto.offset.reset" is 
set to, it will be performed according to the strategy of nearest when 
out-of-range (to the earliest if it was under the range , or to the latest if 
it was over the range), and for the case where no committed offset, nearest 
will naturally have no effect, instead, it is determined by the main 
strategy(auto.offset.reset).
> > > 
> > > 
> > > 
> > > 
> > > 3. This I have added to the form of module "Proposed Changes" in 
kip-842.
> > > 
> > > 
> > > 
> > > 
> > > 4. The meaning of nearest.offset.reset has been clearly 
expressed in point 2, this configuration is disabled default, that is to say, 
when out-of-range, reset strategy is performed according to the main strategy 
(auto.offset.reset).
> > > 
> > > 
> > > 
> > >> -原始邮件-
> > >> 发件人: "deng ziming" 
> > >> 发送时间: 2022-05-27 16:02:53 (星期五)
> > >> 收件人: dev@kafka.apache.org
> > >> 抄送: 
> > >> 主题: Re: [DISCUSS] KIP-842: Add richer group offset reset 
mechanisms
> > >> 
> > >> Thank you for this KIP, the motivation makes sense to me, 
left some questions:
> > >> 
> > >> 1. If strategy=safe_latest and there is not committed 
offset, we have 2 choices based on whether the group is started newly, can you 
elaborate on how can we decide the group is started newly? It would be clear.
> > >> 
> > >> 2. If strategy=nearest and there is not committed offset, 
its behavior is determined by the earliest, or latest, or safe_latest used 
together. can you elabo

Re: Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2023-03-08 Thread hudeqi
I repost the newly changed KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms

"hudeqi" <16120...@bjtu.edu.cn>写道:
> Hello, have any mates who have discussed it before seen it? Also welcome new 
> mates to discuss together.
> 
> "hudeqi" <16120...@bjtu.edu.cn>写道:
> > Long time no see, this issue has been discussed for a long time, now please 
> > allow me to summarize this issue, and then everyone can help to see which 
> > direction this issue should go in?
> > 
> > There are two problems to be solved by this kip:
> > 1. Solve the problem that when the client configures the 
> > "auto.offset.reset" to latest, the new partition data may be lost when the 
> > consumer resets the offset to the latest after expanding the topic 
> > partition.
> > 
> > 2. In addition to the "earliest", "latest", and "none" provided by the 
> > existing "auto.offset.reset", it also provides more abundant parameters, 
> > such as "latest_on_start" (application startup is reset to latest, and an 
> > exception is thrown if out of range occurs), "earliest_on_start" 
> > (application startup is reset to earliest, and an exception is thrown if 
> > out of range occurs), "nearest"(determined by "auto.offset.reset" when the 
> > program starts, and choose earliest or latest according to the distance 
> > between the current offset and log start offset and log end offset when out 
> > of range occurs).
> > 
> > According to the discussion results of the members above, it seems that 
> > there are concerns about adding these additional offset reset mechanisms: 
> > complexity and compatibility. In fact, these parameters do have 
> > corresponding benefits. Therefore, based on the above discussion results, I 
> > have sorted out two solution directions. You can help me to see which 
> > direction to follow:
> > 
> > 1. The first one is to follow Guozhang's suggestion: keep the three 
> > parameters of "auto.offset.reset" and their meanings unchanged, reduce the 
> > confusion for Kafka users, and solve the compatibility problem by the way. 
> > Add these two parameters:
> > a. "auto.offset.reset.on.no.initial.offse": Indicates the strategy used 
> > to initialize the offset. The default value is the parameter configured by 
> > "auto.offset.reset". If so, the strategy for initializing the offset 
> > remains unchanged from the previous behavior, ensuring compatibility. If 
> > the parameter is configured with "latest_on_start" or "earliest_on_start", 
> > then the offset will be reset according to the configured semantics when 
> > initializing the offset. In this way, the problem of data loss during 
> > partition expansion can be solved: configure 
> > "auto.offset.reset.on.no.initial.offset" to "latest_on_start", and 
> > configure "auto.offset.reset" to earliest.
> > b. "auto.offset.reset.on.invalid.offset": Indicates that the offset is 
> > illegal or out of range occurs. The default value is the parameter 
> > configured by "auto.offset.reset". If so, the processing of out of range is 
> > the same as before to ensure compatibility. If "nearest" is configured, 
> > then the semantic logic corresponding to "nearest" is used only for the 
> > case of out of range.
> > 
> > This solution ensures compatibility and ensures that the semantics of the 
> > original configuration remain unchanged. Only two incremental 
> > configurations are added to flexibly handle different situations.
> > 
> > 2. The second is to directly reduce the complexity of this problem, and 
> > directly add the logic of resetting the initial offset of the newly 
> > expanded partition to the earliest to "auto.offset.reset"="latest". In this 
> > way, Kafka users do not need to perceive this subtle but useful change, and 
> > the processing of other situations remains unchanged (without considering 
> > too many rich offset processing mechanisms).
> > 
> > I hope you can help me with the direction of the solution to this issue, 
> > thank you.
> > 
> > Best,
> > hudeqi


Re: Re: Re: [DISCUSS] KIP-842: Add richer group offset reset mechanisms

2023-03-26 Thread hudeqi
Is there any more attention to this KIP? 
bump this thread.

Best,
hudeqi

"hudeqi" <16120...@bjtu.edu.cn>写道:
> Hello, have any mates who have discussed it before seen it? Also welcome new 
> mates to discuss together.
> 
> "hudeqi" <16120...@bjtu.edu.cn>写道:
> > Long time no see, this issue has been discussed for a long time, now please 
> > allow me to summarize this issue, and then everyone can help to see which 
> > direction this issue should go in?
> > 
> > There are two problems to be solved by this kip:
> > 1. Solve the problem that when the client configures the 
> > "auto.offset.reset" to latest, the new partition data may be lost when the 
> > consumer resets the offset to the latest after expanding the topic 
> > partition.
> > 
> > 2. In addition to the "earliest", "latest", and "none" provided by the 
> > existing "auto.offset.reset", it also provides more abundant parameters, 
> > such as "latest_on_start" (application startup is reset to latest, and an 
> > exception is thrown if out of range occurs), "earliest_on_start" 
> > (application startup is reset to earliest, and an exception is thrown if 
> > out of range occurs), "nearest"(determined by "auto.offset.reset" when the 
> > program starts, and choose earliest or latest according to the distance 
> > between the current offset and log start offset and log end offset when out 
> > of range occurs).
> > 
> > According to the discussion results of the members above, it seems that 
> > there are concerns about adding these additional offset reset mechanisms: 
> > complexity and compatibility. In fact, these parameters do have 
> > corresponding benefits. Therefore, based on the above discussion results, I 
> > have sorted out two solution directions. You can help me to see which 
> > direction to follow:
> > 
> > 1. The first one is to follow Guozhang's suggestion: keep the three 
> > parameters of "auto.offset.reset" and their meanings unchanged, reduce the 
> > confusion for Kafka users, and solve the compatibility problem by the way. 
> > Add these two parameters:
> > a. "auto.offset.reset.on.no.initial.offse": Indicates the strategy used 
> > to initialize the offset. The default value is the parameter configured by 
> > "auto.offset.reset". If so, the strategy for initializing the offset 
> > remains unchanged from the previous behavior, ensuring compatibility. If 
> > the parameter is configured with "latest_on_start" or "earliest_on_start", 
> > then the offset will be reset according to the configured semantics when 
> > initializing the offset. In this way, the problem of data loss during 
> > partition expansion can be solved: configure 
> > "auto.offset.reset.on.no.initial.offset" to "latest_on_start", and 
> > configure "auto.offset.reset" to earliest.
> > b. "auto.offset.reset.on.invalid.offset": Indicates that the offset is 
> > illegal or out of range occurs. The default value is the parameter 
> > configured by "auto.offset.reset". If so, the processing of out of range is 
> > the same as before to ensure compatibility. If "nearest" is configured, 
> > then the semantic logic corresponding to "nearest" is used only for the 
> > case of out of range.
> > 
> > This solution ensures compatibility and ensures that the semantics of the 
> > original configuration remain unchanged. Only two incremental 
> > configurations are added to flexibly handle different situations.
> > 
> > 2. The second is to directly reduce the complexity of this problem, and 
> > directly add the logic of resetting the initial offset of the newly 
> > expanded partition to the earliest to "auto.offset.reset"="latest". In this 
> > way, Kafka users do not need to perceive this subtle but useful change, and 
> > the processing of other situations remains unchanged (without considering 
> > too many rich offset processing mechanisms).
> > 
> > I hope you can help me with the direction of the solution to this issue, 
> > thank you.
> > 
> > Best,
> > hudeqi