Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello John, Thanks for the new PR, and sorry for zig-zagging this KIP design. I made a pass on the PR and it looks good to me overall. Left some minor comments. Guozhang On Mon, Feb 22, 2021 at 9:06 AM John Roesler wrote: > Hello all, > > I have updated the KIP now. The diff is visible here: > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=165225765&selectedPageVersions=13&selectedPageVersions=12 > > The PR https://github.com/apache/kafka/pull/10137 is now > available for reviews. > > Thanks so much to you all for your help on this! > John > > On Mon, 2021-02-22 at 10:25 -0600, John Roesler wrote: > > Thanks, Bill, > > > > I was waiting for feedback on the "currentLag" proposal > > before updating the KIP. Since there haven't been any > > objections to my new proposal, I'm in the process of > > updating the KIP document right now. > > > > Personally, I still like the original proposal to expose the > > metadata from the fetch responses as well, but there were > > too many side-effects of going that route. > > > > Once I finish updating the wiki page, I'll ping here again > > to check for objections. > > > > Thank you for taking another look! > > -John > > > > On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote: > > > Thanks for the KIP update John, > > > > > > The KIP as it stands LGTM. > > > > > > One question, in your previous comment you stated that the KIP would > > > introduce the `currentLag()` method, but the KIP seems to show the > > > `metadata()` implementation. > > > FWIW I like the metadata approach as it seems to provide information > the > > > consumer has access to when it receives the fetch response. > > > > > > Thanks again. > > > > > > Bill > > > > > > On Mon, Feb 22, 2021 at 10:51 AM John Roesler > wrote: > > > > > > > Hello all, > > > > > > > > Since there haven't been any big objections to my proposed > > > > design fix, I'm updating the KIP now and opening the PR for > > > > reviews. Hopefully, we can get this merged quickly and > > > > unblock the system tests. > > > > > > > > Thanks, > > > > John > > > > > > > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote: > > > > > Thanks for that idea, Chia-Ping, > > > > > > > > > > I agree that it would be nice for users to have a single API > > > > > to get all kinds of metadata, but I'd rather introduce it in > > > > > a KIP that would expose more than one piece of metadata. At > > > > > the moment, I am motivated to keep this as simple as > > > > > possible, which means only exposing the lag, so it seems > > > > > better to introduce just "currentLag". > > > > > > > > > > A later KIP could introduce some kind of "local metadata" > > > > > API and deprecate this method without harm. That later KIP > > > > > would be in a better position to take its time to find a > > > > > good name, consider which pieces of metadata would be nice > > > > > to have, etc. > > > > > > > > > > Thanks sincerely for all your reviews, > > > > > John > > > > > > > > > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote: > > > > > > That new solution is good to me as it brings fewer changes than > either > > > > options or config. > > > > > > > > > > > > one minor question - Could we return a composite object rather > than > > > > OptionalLong? For example: > > > > > > > > > > > > class Metadata { > > > > > > long lag; > > > > > > } > > > > > > > > > > > > There are a bunch of 'metadata' for a partition and it is > possible we > > > > want to expose more information of a partition in the future. The > composite > > > > object open a room to carry more information without adding more > Public > > > > APIs to Consumer. > > > > > > > > > > > > On 2021/02/17 05:21:06, John Roesler > wrote: > > > > > > > Hello again, all, > > > > > > > > > > > > > > Thank you for your feedback and patience. I am hopeful that > > > > > > > I have been able to come up with a solution that will > > > > > > > satisfy everyone. > > > > > > > > > > > > > > Under a previous design iteration of the desired task idling > > > > > > > semantics in KIP-695, we did indeed require the behavior > > > > > > > change, but while I was considering all of your feedback, I > > > > > > > realized that that requirement is no longer present. > > > > > > > > > > > > > > Just a little more detail in case you are curious: I had > > > > > > > initially wanted to make the task idling semantics > > > > > > > absolutely free of weird timing effects based on how > > > > > > > frequently Streams happens to call poll, so I built in a > > > > > > > mechanism that would force Streams to get back a _fresh_ lag > > > > > > > reponse from poll before proceeding to enforce processing. > > > > > > > However, this resulted in a severe performance degradation, > > > > > > > so I backed off to use a cache of the lag metadata. > > > > > > > > > > > > > > What I realized just now is that under this change, there's > > > > > > > no longer a need to return metadata (or change beha
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello all, I have updated the KIP now. The diff is visible here: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=165225765&selectedPageVersions=13&selectedPageVersions=12 The PR https://github.com/apache/kafka/pull/10137 is now available for reviews. Thanks so much to you all for your help on this! John On Mon, 2021-02-22 at 10:25 -0600, John Roesler wrote: > Thanks, Bill, > > I was waiting for feedback on the "currentLag" proposal > before updating the KIP. Since there haven't been any > objections to my new proposal, I'm in the process of > updating the KIP document right now. > > Personally, I still like the original proposal to expose the > metadata from the fetch responses as well, but there were > too many side-effects of going that route. > > Once I finish updating the wiki page, I'll ping here again > to check for objections. > > Thank you for taking another look! > -John > > On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote: > > Thanks for the KIP update John, > > > > The KIP as it stands LGTM. > > > > One question, in your previous comment you stated that the KIP would > > introduce the `currentLag()` method, but the KIP seems to show the > > `metadata()` implementation. > > FWIW I like the metadata approach as it seems to provide information the > > consumer has access to when it receives the fetch response. > > > > Thanks again. > > > > Bill > > > > On Mon, Feb 22, 2021 at 10:51 AM John Roesler wrote: > > > > > Hello all, > > > > > > Since there haven't been any big objections to my proposed > > > design fix, I'm updating the KIP now and opening the PR for > > > reviews. Hopefully, we can get this merged quickly and > > > unblock the system tests. > > > > > > Thanks, > > > John > > > > > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote: > > > > Thanks for that idea, Chia-Ping, > > > > > > > > I agree that it would be nice for users to have a single API > > > > to get all kinds of metadata, but I'd rather introduce it in > > > > a KIP that would expose more than one piece of metadata. At > > > > the moment, I am motivated to keep this as simple as > > > > possible, which means only exposing the lag, so it seems > > > > better to introduce just "currentLag". > > > > > > > > A later KIP could introduce some kind of "local metadata" > > > > API and deprecate this method without harm. That later KIP > > > > would be in a better position to take its time to find a > > > > good name, consider which pieces of metadata would be nice > > > > to have, etc. > > > > > > > > Thanks sincerely for all your reviews, > > > > John > > > > > > > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote: > > > > > That new solution is good to me as it brings fewer changes than either > > > options or config. > > > > > > > > > > one minor question - Could we return a composite object rather than > > > OptionalLong? For example: > > > > > > > > > > class Metadata { > > > > > long lag; > > > > > } > > > > > > > > > > There are a bunch of 'metadata' for a partition and it is possible we > > > want to expose more information of a partition in the future. The > > > composite > > > object open a room to carry more information without adding more Public > > > APIs to Consumer. > > > > > > > > > > On 2021/02/17 05:21:06, John Roesler wrote: > > > > > > Hello again, all, > > > > > > > > > > > > Thank you for your feedback and patience. I am hopeful that > > > > > > I have been able to come up with a solution that will > > > > > > satisfy everyone. > > > > > > > > > > > > Under a previous design iteration of the desired task idling > > > > > > semantics in KIP-695, we did indeed require the behavior > > > > > > change, but while I was considering all of your feedback, I > > > > > > realized that that requirement is no longer present. > > > > > > > > > > > > Just a little more detail in case you are curious: I had > > > > > > initially wanted to make the task idling semantics > > > > > > absolutely free of weird timing effects based on how > > > > > > frequently Streams happens to call poll, so I built in a > > > > > > mechanism that would force Streams to get back a _fresh_ lag > > > > > > reponse from poll before proceeding to enforce processing. > > > > > > However, this resulted in a severe performance degradation, > > > > > > so I backed off to use a cache of the lag metadata. > > > > > > > > > > > > What I realized just now is that under this change, there's > > > > > > no longer a need to return metadata (or change behavior) in > > > > > > Consumer#poll at all. The lag cache in Streams would always > > > > > > be identical to the one inside the Consumer's > > > > > > SubscriptionState. Therefore, I can instead just expose the > > > > > > Consumer's lag in a new API. Here is what I propose: > > > > > > > > > > > > /** > > > > > > * Get the consumer's current lag on the partition. Returns > > > > > > an "empty" {@link OptionalLong} if the lag is no
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks, Bill, I was waiting for feedback on the "currentLag" proposal before updating the KIP. Since there haven't been any objections to my new proposal, I'm in the process of updating the KIP document right now. Personally, I still like the original proposal to expose the metadata from the fetch responses as well, but there were too many side-effects of going that route. Once I finish updating the wiki page, I'll ping here again to check for objections. Thank you for taking another look! -John On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote: > Thanks for the KIP update John, > > The KIP as it stands LGTM. > > One question, in your previous comment you stated that the KIP would > introduce the `currentLag()` method, but the KIP seems to show the > `metadata()` implementation. > FWIW I like the metadata approach as it seems to provide information the > consumer has access to when it receives the fetch response. > > Thanks again. > > Bill > > On Mon, Feb 22, 2021 at 10:51 AM John Roesler wrote: > > > Hello all, > > > > Since there haven't been any big objections to my proposed > > design fix, I'm updating the KIP now and opening the PR for > > reviews. Hopefully, we can get this merged quickly and > > unblock the system tests. > > > > Thanks, > > John > > > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote: > > > Thanks for that idea, Chia-Ping, > > > > > > I agree that it would be nice for users to have a single API > > > to get all kinds of metadata, but I'd rather introduce it in > > > a KIP that would expose more than one piece of metadata. At > > > the moment, I am motivated to keep this as simple as > > > possible, which means only exposing the lag, so it seems > > > better to introduce just "currentLag". > > > > > > A later KIP could introduce some kind of "local metadata" > > > API and deprecate this method without harm. That later KIP > > > would be in a better position to take its time to find a > > > good name, consider which pieces of metadata would be nice > > > to have, etc. > > > > > > Thanks sincerely for all your reviews, > > > John > > > > > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote: > > > > That new solution is good to me as it brings fewer changes than either > > options or config. > > > > > > > > one minor question - Could we return a composite object rather than > > OptionalLong? For example: > > > > > > > > class Metadata { > > > > long lag; > > > > } > > > > > > > > There are a bunch of 'metadata' for a partition and it is possible we > > want to expose more information of a partition in the future. The composite > > object open a room to carry more information without adding more Public > > APIs to Consumer. > > > > > > > > On 2021/02/17 05:21:06, John Roesler wrote: > > > > > Hello again, all, > > > > > > > > > > Thank you for your feedback and patience. I am hopeful that > > > > > I have been able to come up with a solution that will > > > > > satisfy everyone. > > > > > > > > > > Under a previous design iteration of the desired task idling > > > > > semantics in KIP-695, we did indeed require the behavior > > > > > change, but while I was considering all of your feedback, I > > > > > realized that that requirement is no longer present. > > > > > > > > > > Just a little more detail in case you are curious: I had > > > > > initially wanted to make the task idling semantics > > > > > absolutely free of weird timing effects based on how > > > > > frequently Streams happens to call poll, so I built in a > > > > > mechanism that would force Streams to get back a _fresh_ lag > > > > > reponse from poll before proceeding to enforce processing. > > > > > However, this resulted in a severe performance degradation, > > > > > so I backed off to use a cache of the lag metadata. > > > > > > > > > > What I realized just now is that under this change, there's > > > > > no longer a need to return metadata (or change behavior) in > > > > > Consumer#poll at all. The lag cache in Streams would always > > > > > be identical to the one inside the Consumer's > > > > > SubscriptionState. Therefore, I can instead just expose the > > > > > Consumer's lag in a new API. Here is what I propose: > > > > > > > > > > /** > > > > > * Get the consumer's current lag on the partition. Returns > > > > > an "empty" {@link OptionalLong} if the lag is not known, > > > > > * for example if there is no position yet, or if the end > > > > > offset is not known yet. > > > > > * > > > > > * > > > > > * This method uses locally cached metadata and never makes > > > > > a remote call. > > > > > * > > > > > * @param topicPartition The partition to get the lag for. > > > > > * > > > > > * @return This {@code Consumer} instance's current lag for > > > > > the given partition. > > > > > * > > > > > * @throws IllegalStateException if the {@code > > > > > topicPartition} is not assigned > > > > > **/ > > > > > @Override > > > > > public OptionalLong currentLag(
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for the KIP update John, The KIP as it stands LGTM. One question, in your previous comment you stated that the KIP would introduce the `currentLag()` method, but the KIP seems to show the `metadata()` implementation. FWIW I like the metadata approach as it seems to provide information the consumer has access to when it receives the fetch response. Thanks again. Bill On Mon, Feb 22, 2021 at 10:51 AM John Roesler wrote: > Hello all, > > Since there haven't been any big objections to my proposed > design fix, I'm updating the KIP now and opening the PR for > reviews. Hopefully, we can get this merged quickly and > unblock the system tests. > > Thanks, > John > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote: > > Thanks for that idea, Chia-Ping, > > > > I agree that it would be nice for users to have a single API > > to get all kinds of metadata, but I'd rather introduce it in > > a KIP that would expose more than one piece of metadata. At > > the moment, I am motivated to keep this as simple as > > possible, which means only exposing the lag, so it seems > > better to introduce just "currentLag". > > > > A later KIP could introduce some kind of "local metadata" > > API and deprecate this method without harm. That later KIP > > would be in a better position to take its time to find a > > good name, consider which pieces of metadata would be nice > > to have, etc. > > > > Thanks sincerely for all your reviews, > > John > > > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote: > > > That new solution is good to me as it brings fewer changes than either > options or config. > > > > > > one minor question - Could we return a composite object rather than > OptionalLong? For example: > > > > > > class Metadata { > > > long lag; > > > } > > > > > > There are a bunch of 'metadata' for a partition and it is possible we > want to expose more information of a partition in the future. The composite > object open a room to carry more information without adding more Public > APIs to Consumer. > > > > > > On 2021/02/17 05:21:06, John Roesler wrote: > > > > Hello again, all, > > > > > > > > Thank you for your feedback and patience. I am hopeful that > > > > I have been able to come up with a solution that will > > > > satisfy everyone. > > > > > > > > Under a previous design iteration of the desired task idling > > > > semantics in KIP-695, we did indeed require the behavior > > > > change, but while I was considering all of your feedback, I > > > > realized that that requirement is no longer present. > > > > > > > > Just a little more detail in case you are curious: I had > > > > initially wanted to make the task idling semantics > > > > absolutely free of weird timing effects based on how > > > > frequently Streams happens to call poll, so I built in a > > > > mechanism that would force Streams to get back a _fresh_ lag > > > > reponse from poll before proceeding to enforce processing. > > > > However, this resulted in a severe performance degradation, > > > > so I backed off to use a cache of the lag metadata. > > > > > > > > What I realized just now is that under this change, there's > > > > no longer a need to return metadata (or change behavior) in > > > > Consumer#poll at all. The lag cache in Streams would always > > > > be identical to the one inside the Consumer's > > > > SubscriptionState. Therefore, I can instead just expose the > > > > Consumer's lag in a new API. Here is what I propose: > > > > > > > > /** > > > > * Get the consumer's current lag on the partition. Returns > > > > an "empty" {@link OptionalLong} if the lag is not known, > > > > * for example if there is no position yet, or if the end > > > > offset is not known yet. > > > > * > > > > * > > > > * This method uses locally cached metadata and never makes > > > > a remote call. > > > > * > > > > * @param topicPartition The partition to get the lag for. > > > > * > > > > * @return This {@code Consumer} instance's current lag for > > > > the given partition. > > > > * > > > > * @throws IllegalStateException if the {@code > > > > topicPartition} is not assigned > > > > **/ > > > > @Override > > > > public OptionalLong currentLag( > > > > TopicPartition topicPartition > > > > ); > > > > > > > > > > > > > > > > With this new API, we have a handy way to find out the lag > > > > of the consumer without ever incurring a remote call. There > > > > is no unnecessarily low-level config option to confuse > > > > users. And there is no change in the behavior of any > > > > existinng API to break users' programs. > > > > > > > > I have implemented this end-to-end in a preview PR: > > > > https://github.com/apache/kafka/pull/10137 > > > > > > > > If this proposal sounds good to all of you, then I will go > > > > ahead and update the KIP. > > > > > > > > Sincerely yours, > > > > -John > > > > > > > > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote: > > > > > here is my two cents. If the behavior eventually gets
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello all, Since there haven't been any big objections to my proposed design fix, I'm updating the KIP now and opening the PR for reviews. Hopefully, we can get this merged quickly and unblock the system tests. Thanks, John On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote: > Thanks for that idea, Chia-Ping, > > I agree that it would be nice for users to have a single API > to get all kinds of metadata, but I'd rather introduce it in > a KIP that would expose more than one piece of metadata. At > the moment, I am motivated to keep this as simple as > possible, which means only exposing the lag, so it seems > better to introduce just "currentLag". > > A later KIP could introduce some kind of "local metadata" > API and deprecate this method without harm. That later KIP > would be in a better position to take its time to find a > good name, consider which pieces of metadata would be nice > to have, etc. > > Thanks sincerely for all your reviews, > John > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote: > > That new solution is good to me as it brings fewer changes than either > > options or config. > > > > one minor question - Could we return a composite object rather than > > OptionalLong? For example: > > > > class Metadata { > > long lag; > > } > > > > There are a bunch of 'metadata' for a partition and it is possible we want > > to expose more information of a partition in the future. The composite > > object open a room to carry more information without adding more Public > > APIs to Consumer. > > > > On 2021/02/17 05:21:06, John Roesler wrote: > > > Hello again, all, > > > > > > Thank you for your feedback and patience. I am hopeful that > > > I have been able to come up with a solution that will > > > satisfy everyone. > > > > > > Under a previous design iteration of the desired task idling > > > semantics in KIP-695, we did indeed require the behavior > > > change, but while I was considering all of your feedback, I > > > realized that that requirement is no longer present. > > > > > > Just a little more detail in case you are curious: I had > > > initially wanted to make the task idling semantics > > > absolutely free of weird timing effects based on how > > > frequently Streams happens to call poll, so I built in a > > > mechanism that would force Streams to get back a _fresh_ lag > > > reponse from poll before proceeding to enforce processing. > > > However, this resulted in a severe performance degradation, > > > so I backed off to use a cache of the lag metadata. > > > > > > What I realized just now is that under this change, there's > > > no longer a need to return metadata (or change behavior) in > > > Consumer#poll at all. The lag cache in Streams would always > > > be identical to the one inside the Consumer's > > > SubscriptionState. Therefore, I can instead just expose the > > > Consumer's lag in a new API. Here is what I propose: > > > > > > /** > > > * Get the consumer's current lag on the partition. Returns > > > an "empty" {@link OptionalLong} if the lag is not known, > > > * for example if there is no position yet, or if the end > > > offset is not known yet. > > > * > > > * > > > * This method uses locally cached metadata and never makes > > > a remote call. > > > * > > > * @param topicPartition The partition to get the lag for. > > > * > > > * @return This {@code Consumer} instance's current lag for > > > the given partition. > > > * > > > * @throws IllegalStateException if the {@code > > > topicPartition} is not assigned > > > **/ > > > @Override > > > public OptionalLong currentLag( > > > TopicPartition topicPartition > > > ); > > > > > > > > > > > > With this new API, we have a handy way to find out the lag > > > of the consumer without ever incurring a remote call. There > > > is no unnecessarily low-level config option to confuse > > > users. And there is no change in the behavior of any > > > existinng API to break users' programs. > > > > > > I have implemented this end-to-end in a preview PR: > > > https://github.com/apache/kafka/pull/10137 > > > > > > If this proposal sounds good to all of you, then I will go > > > ahead and update the KIP. > > > > > > Sincerely yours, > > > -John > > > > > > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote: > > > > here is my two cents. If the behavior eventually gets changed (return > > > > on response), the config is more suitable as it is easier to be > > > > deprecated (less changes). For example, we can introduce the config in > > > > 2.8 and then deprecate it in 2.9. 3.0 removes the config and supports > > > > only return-on-response. > > > > > > > > > > > > > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote: > > > > > Hey John, I know I'm a bit late to this party but just for the record, > > > > > I don't think it's *totally *unreasonable for a user to take up the > > > > > "poll > > > > > on max timeout and assume some records will be returned" approac
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for that idea, Chia-Ping, I agree that it would be nice for users to have a single API to get all kinds of metadata, but I'd rather introduce it in a KIP that would expose more than one piece of metadata. At the moment, I am motivated to keep this as simple as possible, which means only exposing the lag, so it seems better to introduce just "currentLag". A later KIP could introduce some kind of "local metadata" API and deprecate this method without harm. That later KIP would be in a better position to take its time to find a good name, consider which pieces of metadata would be nice to have, etc. Thanks sincerely for all your reviews, John On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote: > That new solution is good to me as it brings fewer changes than either > options or config. > > one minor question - Could we return a composite object rather than > OptionalLong? For example: > > class Metadata { > long lag; > } > > There are a bunch of 'metadata' for a partition and it is possible we want to > expose more information of a partition in the future. The composite object > open a room to carry more information without adding more Public APIs to > Consumer. > > On 2021/02/17 05:21:06, John Roesler wrote: > > Hello again, all, > > > > Thank you for your feedback and patience. I am hopeful that > > I have been able to come up with a solution that will > > satisfy everyone. > > > > Under a previous design iteration of the desired task idling > > semantics in KIP-695, we did indeed require the behavior > > change, but while I was considering all of your feedback, I > > realized that that requirement is no longer present. > > > > Just a little more detail in case you are curious: I had > > initially wanted to make the task idling semantics > > absolutely free of weird timing effects based on how > > frequently Streams happens to call poll, so I built in a > > mechanism that would force Streams to get back a _fresh_ lag > > reponse from poll before proceeding to enforce processing. > > However, this resulted in a severe performance degradation, > > so I backed off to use a cache of the lag metadata. > > > > What I realized just now is that under this change, there's > > no longer a need to return metadata (or change behavior) in > > Consumer#poll at all. The lag cache in Streams would always > > be identical to the one inside the Consumer's > > SubscriptionState. Therefore, I can instead just expose the > > Consumer's lag in a new API. Here is what I propose: > > > > /** > > * Get the consumer's current lag on the partition. Returns > > an "empty" {@link OptionalLong} if the lag is not known, > > * for example if there is no position yet, or if the end > > offset is not known yet. > > * > > * > > * This method uses locally cached metadata and never makes > > a remote call. > > * > > * @param topicPartition The partition to get the lag for. > > * > > * @return This {@code Consumer} instance's current lag for > > the given partition. > > * > > * @throws IllegalStateException if the {@code > > topicPartition} is not assigned > > **/ > > @Override > > public OptionalLong currentLag( > > TopicPartition topicPartition > > ); > > > > > > > > With this new API, we have a handy way to find out the lag > > of the consumer without ever incurring a remote call. There > > is no unnecessarily low-level config option to confuse > > users. And there is no change in the behavior of any > > existinng API to break users' programs. > > > > I have implemented this end-to-end in a preview PR: > > https://github.com/apache/kafka/pull/10137 > > > > If this proposal sounds good to all of you, then I will go > > ahead and update the KIP. > > > > Sincerely yours, > > -John > > > > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote: > > > here is my two cents. If the behavior eventually gets changed (return on > > > response), the config is more suitable as it is easier to be deprecated > > > (less changes). For example, we can introduce the config in 2.8 and then > > > deprecate it in 2.9. 3.0 removes the config and supports only > > > return-on-response. > > > > > > > > > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote: > > > > Hey John, I know I'm a bit late to this party but just for the record, > > > > I don't think it's *totally *unreasonable for a user to take up the > > > > "poll > > > > on max timeout and assume some records will be returned" approach. > > > > And I also can imagine plenty of manually-assigned consumers > > > > implemented doing exactly that. > > > > > > > > It's too bad that we're hitting this during the 2.8 release. If we were > > > > having this discussion in the context of 3.0, then I'd say go for it, > > > > since > > > > it's a breaking change that would just require some modification to the > > > > applications' poll loop. > > > > > > > > A good analogy here seems to be spurious wakeups -- you generally > > > > assume that a wa
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
That new solution is good to me as it brings fewer changes than either options or config. one minor question - Could we return a composite object rather than OptionalLong? For example: class Metadata { long lag; } There are a bunch of 'metadata' for a partition and it is possible we want to expose more information of a partition in the future. The composite object open a room to carry more information without adding more Public APIs to Consumer. On 2021/02/17 05:21:06, John Roesler wrote: > Hello again, all, > > Thank you for your feedback and patience. I am hopeful that > I have been able to come up with a solution that will > satisfy everyone. > > Under a previous design iteration of the desired task idling > semantics in KIP-695, we did indeed require the behavior > change, but while I was considering all of your feedback, I > realized that that requirement is no longer present. > > Just a little more detail in case you are curious: I had > initially wanted to make the task idling semantics > absolutely free of weird timing effects based on how > frequently Streams happens to call poll, so I built in a > mechanism that would force Streams to get back a _fresh_ lag > reponse from poll before proceeding to enforce processing. > However, this resulted in a severe performance degradation, > so I backed off to use a cache of the lag metadata. > > What I realized just now is that under this change, there's > no longer a need to return metadata (or change behavior) in > Consumer#poll at all. The lag cache in Streams would always > be identical to the one inside the Consumer's > SubscriptionState. Therefore, I can instead just expose the > Consumer's lag in a new API. Here is what I propose: > > /** > * Get the consumer's current lag on the partition. Returns > an "empty" {@link OptionalLong} if the lag is not known, > * for example if there is no position yet, or if the end > offset is not known yet. > * > * > * This method uses locally cached metadata and never makes > a remote call. > * > * @param topicPartition The partition to get the lag for. > * > * @return This {@code Consumer} instance's current lag for > the given partition. > * > * @throws IllegalStateException if the {@code > topicPartition} is not assigned > **/ > @Override > public OptionalLong currentLag( > TopicPartition topicPartition > ); > > > > With this new API, we have a handy way to find out the lag > of the consumer without ever incurring a remote call. There > is no unnecessarily low-level config option to confuse > users. And there is no change in the behavior of any > existinng API to break users' programs. > > I have implemented this end-to-end in a preview PR: > https://github.com/apache/kafka/pull/10137 > > If this proposal sounds good to all of you, then I will go > ahead and update the KIP. > > Sincerely yours, > -John > > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote: > > here is my two cents. If the behavior eventually gets changed (return on > > response), the config is more suitable as it is easier to be deprecated > > (less changes). For example, we can introduce the config in 2.8 and then > > deprecate it in 2.9. 3.0 removes the config and supports only > > return-on-response. > > > > > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote: > > > Hey John, I know I'm a bit late to this party but just for the record, > > > I don't think it's *totally *unreasonable for a user to take up the "poll > > > on max timeout and assume some records will be returned" approach. > > > And I also can imagine plenty of manually-assigned consumers > > > implemented doing exactly that. > > > > > > It's too bad that we're hitting this during the 2.8 release. If we were > > > having this discussion in the context of 3.0, then I'd say go for it, > > > since > > > it's a breaking change that would just require some modification to the > > > applications' poll loop. > > > > > > A good analogy here seems to be spurious wakeups -- you generally > > > assume that a waiting thread has woken up due to a notify event in > > > another thread, but the docs always make it very clear up front that > > > this can happen "spuriously" and therefore you need to recheck whatever > > > condition you were waiting on before assuming the thread should proceed. > > > > > > Since we *didn't* document this possibility up front in the case of > > > poll(), > > > it > > > seems unfair to suddenly change the behavior in a supposedly non-breaking > > > release. Imagine how many programs would break if spurious wakeups > > > were suddenly introduced in a release, rather than warned about from > > > the get-go (not a perfect analogy, far more programs rely on wait/notify > > > than on poll() returning records, but I think the point still stands. > > > > > > For the record, I also agree with Ismael that a config doesn't feel ideal. > > > There are already enough configs to present a steep learning curve, so > >
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello again, all, Thank you for your feedback and patience. I am hopeful that I have been able to come up with a solution that will satisfy everyone. Under a previous design iteration of the desired task idling semantics in KIP-695, we did indeed require the behavior change, but while I was considering all of your feedback, I realized that that requirement is no longer present. Just a little more detail in case you are curious: I had initially wanted to make the task idling semantics absolutely free of weird timing effects based on how frequently Streams happens to call poll, so I built in a mechanism that would force Streams to get back a _fresh_ lag reponse from poll before proceeding to enforce processing. However, this resulted in a severe performance degradation, so I backed off to use a cache of the lag metadata. What I realized just now is that under this change, there's no longer a need to return metadata (or change behavior) in Consumer#poll at all. The lag cache in Streams would always be identical to the one inside the Consumer's SubscriptionState. Therefore, I can instead just expose the Consumer's lag in a new API. Here is what I propose: /** * Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known, * for example if there is no position yet, or if the end offset is not known yet. * * * This method uses locally cached metadata and never makes a remote call. * * @param topicPartition The partition to get the lag for. * * @return This {@code Consumer} instance's current lag for the given partition. * * @throws IllegalStateException if the {@code topicPartition} is not assigned **/ @Override public OptionalLong currentLag( TopicPartition topicPartition ); With this new API, we have a handy way to find out the lag of the consumer without ever incurring a remote call. There is no unnecessarily low-level config option to confuse users. And there is no change in the behavior of any existinng API to break users' programs. I have implemented this end-to-end in a preview PR: https://github.com/apache/kafka/pull/10137 If this proposal sounds good to all of you, then I will go ahead and update the KIP. Sincerely yours, -John On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote: > here is my two cents. If the behavior eventually gets changed (return on > response), the config is more suitable as it is easier to be deprecated (less > changes). For example, we can introduce the config in 2.8 and then deprecate > it in 2.9. 3.0 removes the config and supports only return-on-response. > > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote: > > Hey John, I know I'm a bit late to this party but just for the record, > > I don't think it's *totally *unreasonable for a user to take up the "poll > > on max timeout and assume some records will be returned" approach. > > And I also can imagine plenty of manually-assigned consumers > > implemented doing exactly that. > > > > It's too bad that we're hitting this during the 2.8 release. If we were > > having this discussion in the context of 3.0, then I'd say go for it, since > > it's a breaking change that would just require some modification to the > > applications' poll loop. > > > > A good analogy here seems to be spurious wakeups -- you generally > > assume that a waiting thread has woken up due to a notify event in > > another thread, but the docs always make it very clear up front that > > this can happen "spuriously" and therefore you need to recheck whatever > > condition you were waiting on before assuming the thread should proceed. > > > > Since we *didn't* document this possibility up front in the case of poll(), > > it > > seems unfair to suddenly change the behavior in a supposedly non-breaking > > release. Imagine how many programs would break if spurious wakeups > > were suddenly introduced in a release, rather than warned about from > > the get-go (not a perfect analogy, far more programs rely on wait/notify > > than on poll() returning records, but I think the point still stands. > > > > For the record, I also agree with Ismael that a config doesn't feel ideal. > > There are already enough configs to present a steep learning curve, so > > I would avoid adding one more wherever possible. And it does indeed > > seem possible to avoid here, since it's really just a boolean flag (rather > > than a semi-unbounded space, eg max.poll.interval.ms, or a constant > > value, eg group.id, where a config does feel appropriate). > > > > Given all that, I would personally advocate for the pollOptions overload. > > The obvious advantages here are: > > 1) it's more future-proof, in that we can avoid having a similar discussion > > if/when we want to consider other semantics changes to poll which some > > users may want while others would not > > 2) it leaves the door open to using poll with either semantics in a single > > consumer. I doubt that's going to be very common
Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization
here is my two cents. If the behavior eventually gets changed (return on response), the config is more suitable as it is easier to be deprecated (less changes). For example, we can introduce the config in 2.8 and then deprecate it in 2.9. 3.0 removes the config and supports only return-on-response. On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote: > Hey John, I know I'm a bit late to this party but just for the record, > I don't think it's *totally *unreasonable for a user to take up the "poll > on max timeout and assume some records will be returned" approach. > And I also can imagine plenty of manually-assigned consumers > implemented doing exactly that. > > It's too bad that we're hitting this during the 2.8 release. If we were > having this discussion in the context of 3.0, then I'd say go for it, since > it's a breaking change that would just require some modification to the > applications' poll loop. > > A good analogy here seems to be spurious wakeups -- you generally > assume that a waiting thread has woken up due to a notify event in > another thread, but the docs always make it very clear up front that > this can happen "spuriously" and therefore you need to recheck whatever > condition you were waiting on before assuming the thread should proceed. > > Since we *didn't* document this possibility up front in the case of poll(), > it > seems unfair to suddenly change the behavior in a supposedly non-breaking > release. Imagine how many programs would break if spurious wakeups > were suddenly introduced in a release, rather than warned about from > the get-go (not a perfect analogy, far more programs rely on wait/notify > than on poll() returning records, but I think the point still stands. > > For the record, I also agree with Ismael that a config doesn't feel ideal. > There are already enough configs to present a steep learning curve, so > I would avoid adding one more wherever possible. And it does indeed > seem possible to avoid here, since it's really just a boolean flag (rather > than a semi-unbounded space, eg max.poll.interval.ms, or a constant > value, eg group.id, where a config does feel appropriate). > > Given all that, I would personally advocate for the pollOptions overload. > The obvious advantages here are: > 1) it's more future-proof, in that we can avoid having a similar discussion > if/when we want to consider other semantics changes to poll which some > users may want while others would not > 2) it leaves the door open to using poll with either semantics in a single > consumer. I doubt that's going to be very common in terms of the specific > option we're discussing here, but it may be more useful for other options > we may add in the future > > Just my 2 cents. But if the pollOptions proposal would really add so much > additional work that it would cause the 2.8 release to be significantly > delayed, > then that's worth taking into account as well. > > On Wed, Feb 10, 2021 at 9:35 AM John Roesler wrote: > > > Hello again, all. > > > > I have submitted the PR: > > https://github.com/apache/kafka/pull/10096 > > > > Ismael chimed in on the PR review to indicate that the > > config approach may not be desirable. > > > > How strongly do we feel that the behavior change is > > unacceptable? It seems like most of the people involved felt > > the behavior change is ok (although the docs were wrong). > > > > The arguments against the behavior change were plausible, > > but hypothetical. > > > > Can everyone take a look at the PR and weigh in on whether > > the complexity of an extra config option is really worth it > > in this case? > > > > I have to confess I'm currently leaning more toward dropping > > the config and going back to the behavior change, while > > correcting the docs and the system test. > > > > While we are wavering on this point, the system tests > > continue to fail, and the 2.8.0 release is blocked. We > > should aim to make a call today. > > > > Thanks all, > > -John > > > > On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote: > > > Thanks everyone for chiming in here! I'd also prefer the config approach > > if > > > compared with API changes. > > > > > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck wrote: > > > > > > > I meant to chime in earlier. > > > > > > > > I also like the `PollOptions` idea, but I have to agree that the config > > > > option would be the least disruptive approach. > > > > > > > > Thanks, > > > > Bill > > > > > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler > > wrote: > > > > > > > > > Thanks, all! > > > > > > > > > > It seems that the config I proposed is a solution that > > > > > everyone can be happy with, so I will go ahead with a PR to > > > > > fix that. > > > > > > > > > > I'll update the KIP after a round of PR reviews, in case > > > > > there are new concerns that arise. > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > > > > > > Thanks for
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hey John, I know I'm a bit late to this party but just for the record, I don't think it's *totally *unreasonable for a user to take up the "poll on max timeout and assume some records will be returned" approach. And I also can imagine plenty of manually-assigned consumers implemented doing exactly that. It's too bad that we're hitting this during the 2.8 release. If we were having this discussion in the context of 3.0, then I'd say go for it, since it's a breaking change that would just require some modification to the applications' poll loop. A good analogy here seems to be spurious wakeups -- you generally assume that a waiting thread has woken up due to a notify event in another thread, but the docs always make it very clear up front that this can happen "spuriously" and therefore you need to recheck whatever condition you were waiting on before assuming the thread should proceed. Since we *didn't* document this possibility up front in the case of poll(), it seems unfair to suddenly change the behavior in a supposedly non-breaking release. Imagine how many programs would break if spurious wakeups were suddenly introduced in a release, rather than warned about from the get-go (not a perfect analogy, far more programs rely on wait/notify than on poll() returning records, but I think the point still stands. For the record, I also agree with Ismael that a config doesn't feel ideal. There are already enough configs to present a steep learning curve, so I would avoid adding one more wherever possible. And it does indeed seem possible to avoid here, since it's really just a boolean flag (rather than a semi-unbounded space, eg max.poll.interval.ms, or a constant value, eg group.id, where a config does feel appropriate). Given all that, I would personally advocate for the pollOptions overload. The obvious advantages here are: 1) it's more future-proof, in that we can avoid having a similar discussion if/when we want to consider other semantics changes to poll which some users may want while others would not 2) it leaves the door open to using poll with either semantics in a single consumer. I doubt that's going to be very common in terms of the specific option we're discussing here, but it may be more useful for other options we may add in the future Just my 2 cents. But if the pollOptions proposal would really add so much additional work that it would cause the 2.8 release to be significantly delayed, then that's worth taking into account as well. On Wed, Feb 10, 2021 at 9:35 AM John Roesler wrote: > Hello again, all. > > I have submitted the PR: > https://github.com/apache/kafka/pull/10096 > > Ismael chimed in on the PR review to indicate that the > config approach may not be desirable. > > How strongly do we feel that the behavior change is > unacceptable? It seems like most of the people involved felt > the behavior change is ok (although the docs were wrong). > > The arguments against the behavior change were plausible, > but hypothetical. > > Can everyone take a look at the PR and weigh in on whether > the complexity of an extra config option is really worth it > in this case? > > I have to confess I'm currently leaning more toward dropping > the config and going back to the behavior change, while > correcting the docs and the system test. > > While we are wavering on this point, the system tests > continue to fail, and the 2.8.0 release is blocked. We > should aim to make a call today. > > Thanks all, > -John > > On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote: > > Thanks everyone for chiming in here! I'd also prefer the config approach > if > > compared with API changes. > > > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck wrote: > > > > > I meant to chime in earlier. > > > > > > I also like the `PollOptions` idea, but I have to agree that the config > > > option would be the least disruptive approach. > > > > > > Thanks, > > > Bill > > > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler > wrote: > > > > > > > Thanks, all! > > > > > > > > It seems that the config I proposed is a solution that > > > > everyone can be happy with, so I will go ahead with a PR to > > > > fix that. > > > > > > > > I'll update the KIP after a round of PR reviews, in case > > > > there are new concerns that arise. > > > > > > > > Thanks, > > > > -John > > > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > > > > > Thanks for providing more details. > > > > > > > > > > Adding a config might be the way a least resistance... I am fine > with > > > > that. > > > > > > > > > > -Matthias > > > > > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > > > > > vvv > > > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > > > > > This idea LGTM. It not only makes minimum changes to current > behavior > > > > but also works for KIP-695. > > > > > > > > > > > > On 2021/02/04 16:07:11, John Roesler > wrote: > > > > > > > Hi Matthias, Chia-Ping, and Tom, > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello again, all. I have submitted the PR: https://github.com/apache/kafka/pull/10096 Ismael chimed in on the PR review to indicate that the config approach may not be desirable. How strongly do we feel that the behavior change is unacceptable? It seems like most of the people involved felt the behavior change is ok (although the docs were wrong). The arguments against the behavior change were plausible, but hypothetical. Can everyone take a look at the PR and weigh in on whether the complexity of an extra config option is really worth it in this case? I have to confess I'm currently leaning more toward dropping the config and going back to the behavior change, while correcting the docs and the system test. While we are wavering on this point, the system tests continue to fail, and the 2.8.0 release is blocked. We should aim to make a call today. Thanks all, -John On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote: > Thanks everyone for chiming in here! I'd also prefer the config approach if > compared with API changes. > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck wrote: > > > I meant to chime in earlier. > > > > I also like the `PollOptions` idea, but I have to agree that the config > > option would be the least disruptive approach. > > > > Thanks, > > Bill > > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler wrote: > > > > > Thanks, all! > > > > > > It seems that the config I proposed is a solution that > > > everyone can be happy with, so I will go ahead with a PR to > > > fix that. > > > > > > I'll update the KIP after a round of PR reviews, in case > > > there are new concerns that arise. > > > > > > Thanks, > > > -John > > > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > > > > Thanks for providing more details. > > > > > > > > Adding a config might be the way a least resistance... I am fine with > > > that. > > > > > > > > -Matthias > > > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > > > > vvv > > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > > > This idea LGTM. It not only makes minimum changes to current behavior > > > but also works for KIP-695. > > > > > > > > > > On 2021/02/04 16:07:11, John Roesler wrote: > > > > > > Hi Matthias, Chia-Ping, and Tom, > > > > > > > > > > > > Thanks for the thoughtful replies! > > > > > > > > > > > > Re: poll(~forever~) to block indefinitely on records: > > > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't > > > > > > personally recommend for anyone to write code that blocks > > > > > > forever on I/O, I do agree this is something that "real > > > > > > people" may want to do. > > > > > > > > > > > > Just a note for the record, this approach should only be > > > > > > used in conjunction with a manual assignment. If people are > > > > > > using a group subscription, they're setting themselves up to > > > > > > get kicked out of the group when there is low volume of > > > > > > updates on the topic. And then, when they get kicked out, > > > > > > they will never know it because they're just going to be > > > > > > blocked in `poll()` the whole time. > > > > > > > > > > > > However, if you don't participate in a group and just: > > > > > > 1 assign(partitions) > > > > > > 2 poll(forever), > > > > > > you should indeed expect to return from poll only when you > > > > > > have records. > > > > > > > > > > > > This possibility is the turning point for me. I'd like to > > > > > > alter my proposal to an opt-in config, detailed below. > > > > > > > > > > > > Re: Javadoc: > > > > > > Thanks for pointing that out. It does seem like, if we do > > > > > > decide to change behavior, we should adjust the Javadoc to > > > > > > say so. That was an oversight on my part, and I daresay that > > > > > > if I had done that initially, it would have saved Rajini > > > > > > from having to dig into the code to pinpoint the cause of > > > > > > those test failures. > > > > > > > > > > > > Re: PollOptions: > > > > > > I actually like this option quite a bit. It seems like this > > > > > > would be warranted if we expect someone to want to use the > > > > > > same Consumer instance in both "return on metadata or > > > > > > records" and "return on only records" mode. Otherwise, we > > > > > > might as well introduce a new config. > > > > > > > > > > > > It also seems like the behavior I proposed in this KIP is > > > > > > somewhat "advanced", so I could certainly see leaving it off > > > > > > by default and offering an opt-in config. > > > > > > > > > > > > How does everyone feel about this opt-in config: > > > > > > > > > > > > vvv > > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > > > > > doc: > > > > > > * return_on_records: (default) a call to > > > > > > Consumer#poll(timeout) will block up to the timeout and > > > > > > return early if records are received. > > > > > > * return_on_response: a call to Cons
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks everyone for chiming in here! I'd also prefer the config approach if compared with API changes. On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck wrote: > I meant to chime in earlier. > > I also like the `PollOptions` idea, but I have to agree that the config > option would be the least disruptive approach. > > Thanks, > Bill > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler wrote: > > > Thanks, all! > > > > It seems that the config I proposed is a solution that > > everyone can be happy with, so I will go ahead with a PR to > > fix that. > > > > I'll update the KIP after a round of PR reviews, in case > > there are new concerns that arise. > > > > Thanks, > > -John > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > > > Thanks for providing more details. > > > > > > Adding a config might be the way a least resistance... I am fine with > > that. > > > > > > -Matthias > > > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > > > vvv > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > This idea LGTM. It not only makes minimum changes to current behavior > > but also works for KIP-695. > > > > > > > > On 2021/02/04 16:07:11, John Roesler wrote: > > > > > Hi Matthias, Chia-Ping, and Tom, > > > > > > > > > > Thanks for the thoughtful replies! > > > > > > > > > > Re: poll(~forever~) to block indefinitely on records: > > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't > > > > > personally recommend for anyone to write code that blocks > > > > > forever on I/O, I do agree this is something that "real > > > > > people" may want to do. > > > > > > > > > > Just a note for the record, this approach should only be > > > > > used in conjunction with a manual assignment. If people are > > > > > using a group subscription, they're setting themselves up to > > > > > get kicked out of the group when there is low volume of > > > > > updates on the topic. And then, when they get kicked out, > > > > > they will never know it because they're just going to be > > > > > blocked in `poll()` the whole time. > > > > > > > > > > However, if you don't participate in a group and just: > > > > > 1 assign(partitions) > > > > > 2 poll(forever), > > > > > you should indeed expect to return from poll only when you > > > > > have records. > > > > > > > > > > This possibility is the turning point for me. I'd like to > > > > > alter my proposal to an opt-in config, detailed below. > > > > > > > > > > Re: Javadoc: > > > > > Thanks for pointing that out. It does seem like, if we do > > > > > decide to change behavior, we should adjust the Javadoc to > > > > > say so. That was an oversight on my part, and I daresay that > > > > > if I had done that initially, it would have saved Rajini > > > > > from having to dig into the code to pinpoint the cause of > > > > > those test failures. > > > > > > > > > > Re: PollOptions: > > > > > I actually like this option quite a bit. It seems like this > > > > > would be warranted if we expect someone to want to use the > > > > > same Consumer instance in both "return on metadata or > > > > > records" and "return on only records" mode. Otherwise, we > > > > > might as well introduce a new config. > > > > > > > > > > It also seems like the behavior I proposed in this KIP is > > > > > somewhat "advanced", so I could certainly see leaving it off > > > > > by default and offering an opt-in config. > > > > > > > > > > How does everyone feel about this opt-in config: > > > > > > > > > > vvv > > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > > > doc: > > > > > * return_on_records: (default) a call to > > > > > Consumer#poll(timeout) will block up to the timeout and > > > > > return early if records are received. > > > > > * return_on_response: a call to Consumer#poll(timeout) will > > > > > block up to the timeout and return early if any fetch > > > > > response is received. Use this option to get updates from > > > > > Consumer#metadata() even if Consumer#records() is empty. > > > > > ^^^ > > > > > > > > > > Thanks, > > > > > John > > > > > > > > > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote: > > > > > > Hi, > > > > > > > > > > > > The Javadoc for KafkaConsumer#poll() includes the following: > > > > > > > > > > > > * This method returns immediately if there are records available. > > *Otherwise, > > > > > > > it will await the passed timeout.* > > > > > > > * If the timeout expires, an empty record set will be returned. > > Note that > > > > > > > this method may block beyond the > > > > > > > * timeout in order to execute custom {@link > > ConsumerRebalanceListener} > > > > > > > callbacks. > > > > > > > > > > > > > > > > > > > In other words: If the method returns before the timeout there > > must be > > > > > > records in the method result. After the timeout has passed there > > may be no > > > > > > records. It might block for longer than the timeout. So I think > > r
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
I meant to chime in earlier. I also like the `PollOptions` idea, but I have to agree that the config option would be the least disruptive approach. Thanks, Bill On Fri, Feb 5, 2021 at 6:12 PM John Roesler wrote: > Thanks, all! > > It seems that the config I proposed is a solution that > everyone can be happy with, so I will go ahead with a PR to > fix that. > > I'll update the KIP after a round of PR reviews, in case > there are new concerns that arise. > > Thanks, > -John > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > > Thanks for providing more details. > > > > Adding a config might be the way a least resistance... I am fine with > that. > > > > -Matthias > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > > vvv > > > > long_poll.mode: return_on_records|return_on_response > > > > > > This idea LGTM. It not only makes minimum changes to current behavior > but also works for KIP-695. > > > > > > On 2021/02/04 16:07:11, John Roesler wrote: > > > > Hi Matthias, Chia-Ping, and Tom, > > > > > > > > Thanks for the thoughtful replies! > > > > > > > > Re: poll(~forever~) to block indefinitely on records: > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't > > > > personally recommend for anyone to write code that blocks > > > > forever on I/O, I do agree this is something that "real > > > > people" may want to do. > > > > > > > > Just a note for the record, this approach should only be > > > > used in conjunction with a manual assignment. If people are > > > > using a group subscription, they're setting themselves up to > > > > get kicked out of the group when there is low volume of > > > > updates on the topic. And then, when they get kicked out, > > > > they will never know it because they're just going to be > > > > blocked in `poll()` the whole time. > > > > > > > > However, if you don't participate in a group and just: > > > > 1 assign(partitions) > > > > 2 poll(forever), > > > > you should indeed expect to return from poll only when you > > > > have records. > > > > > > > > This possibility is the turning point for me. I'd like to > > > > alter my proposal to an opt-in config, detailed below. > > > > > > > > Re: Javadoc: > > > > Thanks for pointing that out. It does seem like, if we do > > > > decide to change behavior, we should adjust the Javadoc to > > > > say so. That was an oversight on my part, and I daresay that > > > > if I had done that initially, it would have saved Rajini > > > > from having to dig into the code to pinpoint the cause of > > > > those test failures. > > > > > > > > Re: PollOptions: > > > > I actually like this option quite a bit. It seems like this > > > > would be warranted if we expect someone to want to use the > > > > same Consumer instance in both "return on metadata or > > > > records" and "return on only records" mode. Otherwise, we > > > > might as well introduce a new config. > > > > > > > > It also seems like the behavior I proposed in this KIP is > > > > somewhat "advanced", so I could certainly see leaving it off > > > > by default and offering an opt-in config. > > > > > > > > How does everyone feel about this opt-in config: > > > > > > > > vvv > > > > long_poll.mode: return_on_records|return_on_response > > > > > > > > doc: > > > > * return_on_records: (default) a call to > > > > Consumer#poll(timeout) will block up to the timeout and > > > > return early if records are received. > > > > * return_on_response: a call to Consumer#poll(timeout) will > > > > block up to the timeout and return early if any fetch > > > > response is received. Use this option to get updates from > > > > Consumer#metadata() even if Consumer#records() is empty. > > > > ^^^ > > > > > > > > Thanks, > > > > John > > > > > > > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote: > > > > > Hi, > > > > > > > > > > The Javadoc for KafkaConsumer#poll() includes the following: > > > > > > > > > > * This method returns immediately if there are records available. > *Otherwise, > > > > > > it will await the passed timeout.* > > > > > > * If the timeout expires, an empty record set will be returned. > Note that > > > > > > this method may block beyond the > > > > > > * timeout in order to execute custom {@link > ConsumerRebalanceListener} > > > > > > callbacks. > > > > > > > > > > > > > > > > In other words: If the method returns before the timeout there > must be > > > > > records in the method result. After the timeout has passed there > may be no > > > > > records. It might block for longer than the timeout. So I think > returning > > > > > with empty records before at least the given timeout has passed > breaks that > > > > > contract. > > > > > > > > > > A not-much-prettier alternative to adding a new > > > > > pollForRecordsOrMetadata(Duration) method could be overloading > poll() to > > > > > take an additional parameter which controlled whether an early > return with > > > > > empty records was allowed. Or a `
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks, all! It seems that the config I proposed is a solution that everyone can be happy with, so I will go ahead with a PR to fix that. I'll update the KIP after a round of PR reviews, in case there are new concerns that arise. Thanks, -John On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote: > Thanks for providing more details. > > Adding a config might be the way a least resistance... I am fine with that. > > -Matthias > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: > > > vvv > > > long_poll.mode: return_on_records|return_on_response > > > > This idea LGTM. It not only makes minimum changes to current behavior but > > also works for KIP-695. > > > > On 2021/02/04 16:07:11, John Roesler wrote: > > > Hi Matthias, Chia-Ping, and Tom, > > > > > > Thanks for the thoughtful replies! > > > > > > Re: poll(~forever~) to block indefinitely on records: > > > Thanks for your dilligence, Chia-Ping. While I wouldn't > > > personally recommend for anyone to write code that blocks > > > forever on I/O, I do agree this is something that "real > > > people" may want to do. > > > > > > Just a note for the record, this approach should only be > > > used in conjunction with a manual assignment. If people are > > > using a group subscription, they're setting themselves up to > > > get kicked out of the group when there is low volume of > > > updates on the topic. And then, when they get kicked out, > > > they will never know it because they're just going to be > > > blocked in `poll()` the whole time. > > > > > > However, if you don't participate in a group and just: > > > 1 assign(partitions) > > > 2 poll(forever), > > > you should indeed expect to return from poll only when you > > > have records. > > > > > > This possibility is the turning point for me. I'd like to > > > alter my proposal to an opt-in config, detailed below. > > > > > > Re: Javadoc: > > > Thanks for pointing that out. It does seem like, if we do > > > decide to change behavior, we should adjust the Javadoc to > > > say so. That was an oversight on my part, and I daresay that > > > if I had done that initially, it would have saved Rajini > > > from having to dig into the code to pinpoint the cause of > > > those test failures. > > > > > > Re: PollOptions: > > > I actually like this option quite a bit. It seems like this > > > would be warranted if we expect someone to want to use the > > > same Consumer instance in both "return on metadata or > > > records" and "return on only records" mode. Otherwise, we > > > might as well introduce a new config. > > > > > > It also seems like the behavior I proposed in this KIP is > > > somewhat "advanced", so I could certainly see leaving it off > > > by default and offering an opt-in config. > > > > > > How does everyone feel about this opt-in config: > > > > > > vvv > > > long_poll.mode: return_on_records|return_on_response > > > > > > doc: > > > * return_on_records: (default) a call to > > > Consumer#poll(timeout) will block up to the timeout and > > > return early if records are received. > > > * return_on_response: a call to Consumer#poll(timeout) will > > > block up to the timeout and return early if any fetch > > > response is received. Use this option to get updates from > > > Consumer#metadata() even if Consumer#records() is empty. > > > ^^^ > > > > > > Thanks, > > > John > > > > > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote: > > > > Hi, > > > > > > > > The Javadoc for KafkaConsumer#poll() includes the following: > > > > > > > > * This method returns immediately if there are records available. > > > > *Otherwise, > > > > > it will await the passed timeout.* > > > > > * If the timeout expires, an empty record set will be returned. Note > > > > > that > > > > > this method may block beyond the > > > > > * timeout in order to execute custom {@link ConsumerRebalanceListener} > > > > > callbacks. > > > > > > > > > > > > > In other words: If the method returns before the timeout there must be > > > > records in the method result. After the timeout has passed there may be > > > > no > > > > records. It might block for longer than the timeout. So I think > > > > returning > > > > with empty records before at least the given timeout has passed breaks > > > > that > > > > contract. > > > > > > > > A not-much-prettier alternative to adding a new > > > > pollForRecordsOrMetadata(Duration) method could be overloading poll() to > > > > take an additional parameter which controlled whether an early return > > > > with > > > > empty records was allowed. Or a `poll(PollOptions)`. In the long run it > > > > could be a mistake to include in the method name exactly what might > > > > cause > > > > an early empty return. > > > > > > > > Kind regards, > > > > > > > > Tom > > > > > > > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai > > > > wrote: > > > > > > > > > Thanks for your sharing Matthias. I agree that is inde
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for providing more details. Adding a config might be the way a least resistance... I am fine with that. -Matthias On 2/4/21 9:42 AM, Chia-Ping Tsai wrote: >> vvv >> long_poll.mode: return_on_records|return_on_response > > This idea LGTM. It not only makes minimum changes to current behavior but > also works for KIP-695. > > On 2021/02/04 16:07:11, John Roesler wrote: >> Hi Matthias, Chia-Ping, and Tom, >> >> Thanks for the thoughtful replies! >> >> Re: poll(~forever~) to block indefinitely on records: >> Thanks for your dilligence, Chia-Ping. While I wouldn't >> personally recommend for anyone to write code that blocks >> forever on I/O, I do agree this is something that "real >> people" may want to do. >> >> Just a note for the record, this approach should only be >> used in conjunction with a manual assignment. If people are >> using a group subscription, they're setting themselves up to >> get kicked out of the group when there is low volume of >> updates on the topic. And then, when they get kicked out, >> they will never know it because they're just going to be >> blocked in `poll()` the whole time. >> >> However, if you don't participate in a group and just: >> 1 assign(partitions) >> 2 poll(forever), >> you should indeed expect to return from poll only when you >> have records. >> >> This possibility is the turning point for me. I'd like to >> alter my proposal to an opt-in config, detailed below. >> >> Re: Javadoc: >> Thanks for pointing that out. It does seem like, if we do >> decide to change behavior, we should adjust the Javadoc to >> say so. That was an oversight on my part, and I daresay that >> if I had done that initially, it would have saved Rajini >> from having to dig into the code to pinpoint the cause of >> those test failures. >> >> Re: PollOptions: >> I actually like this option quite a bit. It seems like this >> would be warranted if we expect someone to want to use the >> same Consumer instance in both "return on metadata or >> records" and "return on only records" mode. Otherwise, we >> might as well introduce a new config. >> >> It also seems like the behavior I proposed in this KIP is >> somewhat "advanced", so I could certainly see leaving it off >> by default and offering an opt-in config. >> >> How does everyone feel about this opt-in config: >> >> vvv >> long_poll.mode: return_on_records|return_on_response >> >> doc: >> * return_on_records: (default) a call to >> Consumer#poll(timeout) will block up to the timeout and >> return early if records are received. >> * return_on_response: a call to Consumer#poll(timeout) will >> block up to the timeout and return early if any fetch >> response is received. Use this option to get updates from >> Consumer#metadata() even if Consumer#records() is empty. >> ^^^ >> >> Thanks, >> John >> >> On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote: >>> Hi, >>> >>> The Javadoc for KafkaConsumer#poll() includes the following: >>> >>> * This method returns immediately if there are records available. >>> *Otherwise, it will await the passed timeout.* * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks. >>> >>> In other words: If the method returns before the timeout there must be >>> records in the method result. After the timeout has passed there may be no >>> records. It might block for longer than the timeout. So I think returning >>> with empty records before at least the given timeout has passed breaks that >>> contract. >>> >>> A not-much-prettier alternative to adding a new >>> pollForRecordsOrMetadata(Duration) method could be overloading poll() to >>> take an additional parameter which controlled whether an early return with >>> empty records was allowed. Or a `poll(PollOptions)`. In the long run it >>> could be a mistake to include in the method name exactly what might cause >>> an early empty return. >>> >>> Kind regards, >>> >>> Tom >>> >>> >>> On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai wrote: >>> Thanks for your sharing Matthias. I agree that is indeed an anti-pattern to assume poll() returns data or not. However, I check all usages of poll() in code base. There is an interesting use case - poll(a bigger timeout) - it implies that callers want to block poll()(forever) unless there are available data. [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 [2] https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 Hence, I start to worry client code like aforementioned cases get broken due to behavior change :( On 2021/02/03 22:59:09, "Matthias J. Sax" wrote: > Thanks for your email John. > > I agree tha
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
> vvv > long_poll.mode: return_on_records|return_on_response This idea LGTM. It not only makes minimum changes to current behavior but also works for KIP-695. On 2021/02/04 16:07:11, John Roesler wrote: > Hi Matthias, Chia-Ping, and Tom, > > Thanks for the thoughtful replies! > > Re: poll(~forever~) to block indefinitely on records: > Thanks for your dilligence, Chia-Ping. While I wouldn't > personally recommend for anyone to write code that blocks > forever on I/O, I do agree this is something that "real > people" may want to do. > > Just a note for the record, this approach should only be > used in conjunction with a manual assignment. If people are > using a group subscription, they're setting themselves up to > get kicked out of the group when there is low volume of > updates on the topic. And then, when they get kicked out, > they will never know it because they're just going to be > blocked in `poll()` the whole time. > > However, if you don't participate in a group and just: > 1 assign(partitions) > 2 poll(forever), > you should indeed expect to return from poll only when you > have records. > > This possibility is the turning point for me. I'd like to > alter my proposal to an opt-in config, detailed below. > > Re: Javadoc: > Thanks for pointing that out. It does seem like, if we do > decide to change behavior, we should adjust the Javadoc to > say so. That was an oversight on my part, and I daresay that > if I had done that initially, it would have saved Rajini > from having to dig into the code to pinpoint the cause of > those test failures. > > Re: PollOptions: > I actually like this option quite a bit. It seems like this > would be warranted if we expect someone to want to use the > same Consumer instance in both "return on metadata or > records" and "return on only records" mode. Otherwise, we > might as well introduce a new config. > > It also seems like the behavior I proposed in this KIP is > somewhat "advanced", so I could certainly see leaving it off > by default and offering an opt-in config. > > How does everyone feel about this opt-in config: > > vvv > long_poll.mode: return_on_records|return_on_response > > doc: > * return_on_records: (default) a call to > Consumer#poll(timeout) will block up to the timeout and > return early if records are received. > * return_on_response: a call to Consumer#poll(timeout) will > block up to the timeout and return early if any fetch > response is received. Use this option to get updates from > Consumer#metadata() even if Consumer#records() is empty. > ^^^ > > Thanks, > John > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote: > > Hi, > > > > The Javadoc for KafkaConsumer#poll() includes the following: > > > > * This method returns immediately if there are records available. > > *Otherwise, > > > it will await the passed timeout.* > > > * If the timeout expires, an empty record set will be returned. Note that > > > this method may block beyond the > > > * timeout in order to execute custom {@link ConsumerRebalanceListener} > > > callbacks. > > > > > > > In other words: If the method returns before the timeout there must be > > records in the method result. After the timeout has passed there may be no > > records. It might block for longer than the timeout. So I think returning > > with empty records before at least the given timeout has passed breaks that > > contract. > > > > A not-much-prettier alternative to adding a new > > pollForRecordsOrMetadata(Duration) method could be overloading poll() to > > take an additional parameter which controlled whether an early return with > > empty records was allowed. Or a `poll(PollOptions)`. In the long run it > > could be a mistake to include in the method name exactly what might cause > > an early empty return. > > > > Kind regards, > > > > Tom > > > > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai wrote: > > > > > Thanks for your sharing Matthias. I agree that is indeed an anti-pattern > > > to assume poll() returns data or not. > > > > > > However, I check all usages of poll() in code base. There is an > > > interesting use case - poll(a bigger timeout) - it implies that callers > > > want to block poll()(forever) unless there are available data. > > > > > > [1] > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 > > > [2] > > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 > > > > > > Hence, I start to worry client code like aforementioned cases get broken > > > due to behavior change :( > > > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" wrote: > > > > Thanks for your email John. > > > > > > > > I agree that it seems to be an anti-pattern to write code that makes > > > > assumptions if poll() returns data or not. Thus, we should fix-forward > > > > the system test from my point of view. >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hi Matthias, Chia-Ping, and Tom, Thanks for the thoughtful replies! Re: poll(~forever~) to block indefinitely on records: Thanks for your dilligence, Chia-Ping. While I wouldn't personally recommend for anyone to write code that blocks forever on I/O, I do agree this is something that "real people" may want to do. Just a note for the record, this approach should only be used in conjunction with a manual assignment. If people are using a group subscription, they're setting themselves up to get kicked out of the group when there is low volume of updates on the topic. And then, when they get kicked out, they will never know it because they're just going to be blocked in `poll()` the whole time. However, if you don't participate in a group and just: 1 assign(partitions) 2 poll(forever), you should indeed expect to return from poll only when you have records. This possibility is the turning point for me. I'd like to alter my proposal to an opt-in config, detailed below. Re: Javadoc: Thanks for pointing that out. It does seem like, if we do decide to change behavior, we should adjust the Javadoc to say so. That was an oversight on my part, and I daresay that if I had done that initially, it would have saved Rajini from having to dig into the code to pinpoint the cause of those test failures. Re: PollOptions: I actually like this option quite a bit. It seems like this would be warranted if we expect someone to want to use the same Consumer instance in both "return on metadata or records" and "return on only records" mode. Otherwise, we might as well introduce a new config. It also seems like the behavior I proposed in this KIP is somewhat "advanced", so I could certainly see leaving it off by default and offering an opt-in config. How does everyone feel about this opt-in config: vvv long_poll.mode: return_on_records|return_on_response doc: * return_on_records: (default) a call to Consumer#poll(timeout) will block up to the timeout and return early if records are received. * return_on_response: a call to Consumer#poll(timeout) will block up to the timeout and return early if any fetch response is received. Use this option to get updates from Consumer#metadata() even if Consumer#records() is empty. ^^^ Thanks, John On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote: > Hi, > > The Javadoc for KafkaConsumer#poll() includes the following: > > * This method returns immediately if there are records available. *Otherwise, > > it will await the passed timeout.* > > * If the timeout expires, an empty record set will be returned. Note that > > this method may block beyond the > > * timeout in order to execute custom {@link ConsumerRebalanceListener} > > callbacks. > > > > In other words: If the method returns before the timeout there must be > records in the method result. After the timeout has passed there may be no > records. It might block for longer than the timeout. So I think returning > with empty records before at least the given timeout has passed breaks that > contract. > > A not-much-prettier alternative to adding a new > pollForRecordsOrMetadata(Duration) method could be overloading poll() to > take an additional parameter which controlled whether an early return with > empty records was allowed. Or a `poll(PollOptions)`. In the long run it > could be a mistake to include in the method name exactly what might cause > an early empty return. > > Kind regards, > > Tom > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai wrote: > > > Thanks for your sharing Matthias. I agree that is indeed an anti-pattern > > to assume poll() returns data or not. > > > > However, I check all usages of poll() in code base. There is an > > interesting use case - poll(a bigger timeout) - it implies that callers > > want to block poll()(forever) unless there are available data. > > > > [1] > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 > > [2] > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 > > > > Hence, I start to worry client code like aforementioned cases get broken > > due to behavior change :( > > > > On 2021/02/03 22:59:09, "Matthias J. Sax" wrote: > > > Thanks for your email John. > > > > > > I agree that it seems to be an anti-pattern to write code that makes > > > assumptions if poll() returns data or not. Thus, we should fix-forward > > > the system test from my point of view. > > > > > > From my understanding, the impact of KIP-695 is that we might return > > > early from poll() (ie, before the timeout passed) with no data, only if > > > an empty fetch request comes back and there is no other fetch request > > > that did return data. Thus, for most cases, poll() should still return > > > early and provide data. -- Thus, I have no concerns with the slight > > > behavior change. > > > > > > Would be good to get input from others about
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hi, The Javadoc for KafkaConsumer#poll() includes the following: * This method returns immediately if there are records available. *Otherwise, > it will await the passed timeout.* > * If the timeout expires, an empty record set will be returned. Note that > this method may block beyond the > * timeout in order to execute custom {@link ConsumerRebalanceListener} > callbacks. > In other words: If the method returns before the timeout there must be records in the method result. After the timeout has passed there may be no records. It might block for longer than the timeout. So I think returning with empty records before at least the given timeout has passed breaks that contract. A not-much-prettier alternative to adding a new pollForRecordsOrMetadata(Duration) method could be overloading poll() to take an additional parameter which controlled whether an early return with empty records was allowed. Or a `poll(PollOptions)`. In the long run it could be a mistake to include in the method name exactly what might cause an early empty return. Kind regards, Tom On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai wrote: > Thanks for your sharing Matthias. I agree that is indeed an anti-pattern > to assume poll() returns data or not. > > However, I check all usages of poll() in code base. There is an > interesting use case - poll(a bigger timeout) - it implies that callers > want to block poll()(forever) unless there are available data. > > [1] > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 > [2] > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 > > Hence, I start to worry client code like aforementioned cases get broken > due to behavior change :( > > On 2021/02/03 22:59:09, "Matthias J. Sax" wrote: > > Thanks for your email John. > > > > I agree that it seems to be an anti-pattern to write code that makes > > assumptions if poll() returns data or not. Thus, we should fix-forward > > the system test from my point of view. > > > > From my understanding, the impact of KIP-695 is that we might return > > early from poll() (ie, before the timeout passed) with no data, only if > > an empty fetch request comes back and there is no other fetch request > > that did return data. Thus, for most cases, poll() should still return > > early and provide data. -- Thus, I have no concerns with the slight > > behavior change. > > > > Would be good to get input from others about this question though. > > > > > > -Matthias > > > > > > On 2/3/21 10:06 AM, John Roesler wrote: > > > Hello again all, > > > > > > I'm resurrecting this thread to discuss an issue that has > > > come up after merging the code for this KIP. > > > > > > The issue is that some of the system tests need to be > > > updated in the same way that this integration test needed to > > > be updated: > > > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > > > > > This issue was reported here: > > > https://issues.apache.org/jira/browse/KAFKA-12268 > > > and there is some preliminary discussion here: > > > https://github.com/apache/kafka/pull/10022 > > > > > > First, let me offer my apologies for failing to catch this > > > before the merge. I'm sorry that it became Rajini's work to > > > track down the cause of the failure, when it was my > > > responsibility to ensure the feature was merged safely. > > > > > > To recap the situation: > > > Consumer#poll(Duration) will now return before the duration > > > expires even if there are no records returned if there is > > > some returned metadata. > > > > > > This behavior was important for KIP-695. In the situation > > > where we get no records back for some partition, Streams > > > needs to have the freshest possible information about > > > whether there are no new records on the broker, or whether > > > there are records on the broker that we still need to fetch. > > > If that's not clear, the KIP contains the full story. > > > > > > It's definitely a behavior change, but our rationale was > > > that it's an acceptable behavior change. Our big alternative > > > is to add a _new_ method to Consumer to > > > pollForRecordsOrMetadata(Duration) or something. > > > > > > It seems unreliable to expect the broker to return a > > > particular record within a particular timeout in general, > > > which is what these tests are doing. The broker can decide > > > for several reasons not to return data for a partition, but > > > return data for another partition instead. > > > > > > It seems like the only case where you might reasonably try > > > to rely on that is in a test, where you first write a record > > > to a partition, then you assign only that one partition to a > > > consumer, then you poll on the consumer, expecting it to > > > return the data you just wrote. > > > > > > So the $10 question here is whether we should
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for your sharing Matthias. I agree that is indeed an anti-pattern to assume poll() returns data or not. However, I check all usages of poll() in code base. There is an interesting use case - poll(a bigger timeout) - it implies that callers want to block poll()(forever) unless there are available data. [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 [2] https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 Hence, I start to worry client code like aforementioned cases get broken due to behavior change :( On 2021/02/03 22:59:09, "Matthias J. Sax" wrote: > Thanks for your email John. > > I agree that it seems to be an anti-pattern to write code that makes > assumptions if poll() returns data or not. Thus, we should fix-forward > the system test from my point of view. > > From my understanding, the impact of KIP-695 is that we might return > early from poll() (ie, before the timeout passed) with no data, only if > an empty fetch request comes back and there is no other fetch request > that did return data. Thus, for most cases, poll() should still return > early and provide data. -- Thus, I have no concerns with the slight > behavior change. > > Would be good to get input from others about this question though. > > > -Matthias > > > On 2/3/21 10:06 AM, John Roesler wrote: > > Hello again all, > > > > I'm resurrecting this thread to discuss an issue that has > > come up after merging the code for this KIP. > > > > The issue is that some of the system tests need to be > > updated in the same way that this integration test needed to > > be updated: > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > > > This issue was reported here: > > https://issues.apache.org/jira/browse/KAFKA-12268 > > and there is some preliminary discussion here: > > https://github.com/apache/kafka/pull/10022 > > > > First, let me offer my apologies for failing to catch this > > before the merge. I'm sorry that it became Rajini's work to > > track down the cause of the failure, when it was my > > responsibility to ensure the feature was merged safely. > > > > To recap the situation: > > Consumer#poll(Duration) will now return before the duration > > expires even if there are no records returned if there is > > some returned metadata. > > > > This behavior was important for KIP-695. In the situation > > where we get no records back for some partition, Streams > > needs to have the freshest possible information about > > whether there are no new records on the broker, or whether > > there are records on the broker that we still need to fetch. > > If that's not clear, the KIP contains the full story. > > > > It's definitely a behavior change, but our rationale was > > that it's an acceptable behavior change. Our big alternative > > is to add a _new_ method to Consumer to > > pollForRecordsOrMetadata(Duration) or something. > > > > It seems unreliable to expect the broker to return a > > particular record within a particular timeout in general, > > which is what these tests are doing. The broker can decide > > for several reasons not to return data for a partition, but > > return data for another partition instead. > > > > It seems like the only case where you might reasonably try > > to rely on that is in a test, where you first write a record > > to a partition, then you assign only that one partition to a > > consumer, then you poll on the consumer, expecting it to > > return the data you just wrote. > > > > So the $10 question here is whether we should support this > > apparently artificial (testing-only) use case to the point > > where it's worth adding a whole new method to the Consumer > > interface. > > > > Thanks all, > > John > > > > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: > >> Thanks Jason, > >> > >> We would only return the metadata for the latest fetches. > >> So, if someone wanted to use this to lazily maintain a > >> client-side metadata map for all partitions, they'd have to > >> store it separately and merge in new updates as they arrive. > >> > >> This way: > >> 1. We don't need to increase the complexity of the client by > >> storing that metadata > >> 2. Users will be able to treat all returned metadata as > >> "fresh" without having to reason about the timestamps. > >> 3. All parts of the returned ConsumerRecords object have the > >> same lifecycle: all the data and metadata are the results of > >> the most recent round of fetch responses that had not been > >> previously polled. > >> > >> Does that seem sensible to you? I'll update the KIP to > >> clarify this. > >> > >> Thanks, > >> -John > >> > >> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: > >>> Hi John, > >>> > >>> Just one question. It wasn't very clear to me exactly when the metadata > >>> would be re
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for your email John. I agree that it seems to be an anti-pattern to write code that makes assumptions if poll() returns data or not. Thus, we should fix-forward the system test from my point of view. >From my understanding, the impact of KIP-695 is that we might return early from poll() (ie, before the timeout passed) with no data, only if an empty fetch request comes back and there is no other fetch request that did return data. Thus, for most cases, poll() should still return early and provide data. -- Thus, I have no concerns with the slight behavior change. Would be good to get input from others about this question though. -Matthias On 2/3/21 10:06 AM, John Roesler wrote: > Hello again all, > > I'm resurrecting this thread to discuss an issue that has > come up after merging the code for this KIP. > > The issue is that some of the system tests need to be > updated in the same way that this integration test needed to > be updated: > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > This issue was reported here: > https://issues.apache.org/jira/browse/KAFKA-12268 > and there is some preliminary discussion here: > https://github.com/apache/kafka/pull/10022 > > First, let me offer my apologies for failing to catch this > before the merge. I'm sorry that it became Rajini's work to > track down the cause of the failure, when it was my > responsibility to ensure the feature was merged safely. > > To recap the situation: > Consumer#poll(Duration) will now return before the duration > expires even if there are no records returned if there is > some returned metadata. > > This behavior was important for KIP-695. In the situation > where we get no records back for some partition, Streams > needs to have the freshest possible information about > whether there are no new records on the broker, or whether > there are records on the broker that we still need to fetch. > If that's not clear, the KIP contains the full story. > > It's definitely a behavior change, but our rationale was > that it's an acceptable behavior change. Our big alternative > is to add a _new_ method to Consumer to > pollForRecordsOrMetadata(Duration) or something. > > It seems unreliable to expect the broker to return a > particular record within a particular timeout in general, > which is what these tests are doing. The broker can decide > for several reasons not to return data for a partition, but > return data for another partition instead. > > It seems like the only case where you might reasonably try > to rely on that is in a test, where you first write a record > to a partition, then you assign only that one partition to a > consumer, then you poll on the consumer, expecting it to > return the data you just wrote. > > So the $10 question here is whether we should support this > apparently artificial (testing-only) use case to the point > where it's worth adding a whole new method to the Consumer > interface. > > Thanks all, > John > > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: >> Thanks Jason, >> >> We would only return the metadata for the latest fetches. >> So, if someone wanted to use this to lazily maintain a >> client-side metadata map for all partitions, they'd have to >> store it separately and merge in new updates as they arrive. >> >> This way: >> 1. We don't need to increase the complexity of the client by >> storing that metadata >> 2. Users will be able to treat all returned metadata as >> "fresh" without having to reason about the timestamps. >> 3. All parts of the returned ConsumerRecords object have the >> same lifecycle: all the data and metadata are the results of >> the most recent round of fetch responses that had not been >> previously polled. >> >> Does that seem sensible to you? I'll update the KIP to >> clarify this. >> >> Thanks, >> -John >> >> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: >>> Hi John, >>> >>> Just one question. It wasn't very clear to me exactly when the metadata >>> would be returned in `ConsumerRecords`. Would we /always/ include the >>> metadata for all partitions that are assigned, or would it be based on the >>> latest fetches? >>> >>> Thanks, >>> Jason >>> >>> On Fri, Dec 11, 2020 at 4:07 PM John Roesler wrote: >>> Thanks, Guozhang! All of your feedback sounds good to me. I’ll update the KIP when I am able. 3) I believe it is the position after the fetch, but I will confirm. I think omitting position may render beginning and end offsets useless as well, which leaves only lag. That would be fine with me, but it also seems nice to supply this extra metadata since it is well defined and probably handy for others. Therefore, I’d go the route of specifying the exact semantics and keeping it. Thanks for the review, John On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > Hello John, >>
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello again all, I'm resurrecting this thread to discuss an issue that has come up after merging the code for this KIP. The issue is that some of the system tests need to be updated in the same way that this integration test needed to be updated: https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 This issue was reported here: https://issues.apache.org/jira/browse/KAFKA-12268 and there is some preliminary discussion here: https://github.com/apache/kafka/pull/10022 First, let me offer my apologies for failing to catch this before the merge. I'm sorry that it became Rajini's work to track down the cause of the failure, when it was my responsibility to ensure the feature was merged safely. To recap the situation: Consumer#poll(Duration) will now return before the duration expires even if there are no records returned if there is some returned metadata. This behavior was important for KIP-695. In the situation where we get no records back for some partition, Streams needs to have the freshest possible information about whether there are no new records on the broker, or whether there are records on the broker that we still need to fetch. If that's not clear, the KIP contains the full story. It's definitely a behavior change, but our rationale was that it's an acceptable behavior change. Our big alternative is to add a _new_ method to Consumer to pollForRecordsOrMetadata(Duration) or something. It seems unreliable to expect the broker to return a particular record within a particular timeout in general, which is what these tests are doing. The broker can decide for several reasons not to return data for a partition, but return data for another partition instead. It seems like the only case where you might reasonably try to rely on that is in a test, where you first write a record to a partition, then you assign only that one partition to a consumer, then you poll on the consumer, expecting it to return the data you just wrote. So the $10 question here is whether we should support this apparently artificial (testing-only) use case to the point where it's worth adding a whole new method to the Consumer interface. Thanks all, John On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: > Thanks Jason, > > We would only return the metadata for the latest fetches. > So, if someone wanted to use this to lazily maintain a > client-side metadata map for all partitions, they'd have to > store it separately and merge in new updates as they arrive. > > This way: > 1. We don't need to increase the complexity of the client by > storing that metadata > 2. Users will be able to treat all returned metadata as > "fresh" without having to reason about the timestamps. > 3. All parts of the returned ConsumerRecords object have the > same lifecycle: all the data and metadata are the results of > the most recent round of fetch responses that had not been > previously polled. > > Does that seem sensible to you? I'll update the KIP to > clarify this. > > Thanks, > -John > > On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: > > Hi John, > > > > Just one question. It wasn't very clear to me exactly when the metadata > > would be returned in `ConsumerRecords`. Would we /always/ include the > > metadata for all partitions that are assigned, or would it be based on the > > latest fetches? > > > > Thanks, > > Jason > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler wrote: > > > > > Thanks, Guozhang! > > > > > > All of your feedback sounds good to me. I’ll update the KIP when I am > > > able. > > > > > > 3) I believe it is the position after the fetch, but I will confirm. I > > > think omitting position may render beginning and end offsets useless as > > > well, which leaves only lag. That would be fine with me, but it also seems > > > nice to supply this extra metadata since it is well defined and probably > > > handy for others. Therefore, I’d go the route of specifying the exact > > > semantics and keeping it. > > > > > > Thanks for the review, > > > John > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > > Hello John, > > > > > > > > Thanks for the updates! I've made a pass on the KIP and also the POC PR, > > > > here are some minor comments: > > > > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > > > updated, > > > > and we do not create a new object but just update the values in-place, > > > > so > > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > > > 2) It will be great to verify in javadocs that the new API > > > > "ConsumerRecords#metadata(): Map" may return a > > > > superset of TopicPartitions than the existing API that returns the data > > > by > > > > partitions, in case users assume their map key-entries would always be > > > the > > > > same. > > > > > > > > 3) The "position()" API of the call needs better clarification: is it > > > > the > > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello all, Thanks to all who participated in the discussion and vote. I'm closing the vote now and marking KIP-695 as accepted: * 4 binding +1 (Guozhang, Bill, Matthias, and myself) * 2 non-binding +1 (Bruno and Walker) The PRs will follow shortly. Thanks, -John On Fri, 2020-12-18 at 11:53 -0800, Matthias J. Sax wrote: > Sorry that I am late to the game. > > +1 (binding) > > We always knew about this gap when we did KIP-353, and thus, I am glad > that we finally address it. > > > -Matthias > > On 12/18/20 10:16 AM, John Roesler wrote: > > Thanks for your question, Ismael, > > > > Are you concerned about the consumer performance, streams performance or > > both? > > > > On the consumer side, this is only creating one extra struct for each > > response partition to represent the metadata that we already have access to > > internally. I don’t think this would have a measurable performance impact. > > > > On the streams side, I would definitely like to ensure that performance > > doesn’t decrease for users. I ran our internal benchmarks on my POC branch > > and found that the measured throughput across all operations is within the > > 99% confidence interval of the baseline performance of trunk. I also > > deployed our internal soak test from my POC branch, which includes a join > > operation, and I observe that the throughput of that soak cluster is > > identical to the soak for trunk. > > > > This result is to be expected, since the semantics improve the here would > > only kick in for Join/Merge operations where Streams is processing faster > > than it can fetch some partitions on average. I would expect Streams to > > catch up to the fetches occasionally, but not on average. > > > > It’s also worth noting that we have seen increasing numbers of users > > complaining of incorrect join results due to the current implementation. > > Even if the new implementation showed a modest drop in performance, I would > > advocate for correct results over top performance by default. > > > > Finally, to assuage any lingering concerns, there is a configuration > > available to completely disable the new semantics proposed here and revert > > to the prior behavior. > > > > These details seem worth mentioning in the KIP. I’ll update the document > > shortly. > > > > Thanks again, > > John > > > > On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote: > > > Hi John, > > > > > > It would be good to make sure these changes have no measurable performance > > > impact for the use cases that don't need it. Have we given this some > > > thought? And what would be the perf testing strategy to verify this? > > > > > > Ismael > > > > > > On Fri, Dec 18, 2020 at 8:39 AM John Roesler wrote: > > > > > > > Thanks for the votes and reviews, all. I'll wait for a > > > > response from Jason before closing the vote, since he asked > > > > for clarification. > > > > > > > > The present count is: > > > > * 3 binding +1 (Guozhang, Bill, and myself) > > > > * 2 non-binding +1 (Bruno and Walker) > > > > > > > > I have updated the KIP document in response to the requests > > > > for clarification: > > > > 1) The new metadata() map actually just contains immutable > > > > Metadata objects representing the metadata received in the > > > > last round of fetch responses, so I decided to stick with > > > > `receivedMetadata`, as that is an accurate representation of > > > > the timestamp's meaning. > > > > > > > > 2) I added a javadoc clarifying that the metadata partitions > > > > may be a superset of the data partitions in the same > > > > ConsumerRecords > > > > > > > > 3) I confirmed that the position we are returning is the > > > > next offset to fetch after the current returned records. > > > > This is equivalent to the "current position" of the consumer > > > > after the call to poll() that returns this ConsumerRecords > > > > object > > > > > > > > 4) (Jason's question about whether we include metadata for > > > > all partitions or just the latest fetch responses) I've > > > > clarified the javadoc to state that the metadata is only > > > > what was included in the latest fetches. > > > > > > > > Thanks, > > > > -John > > > > > > > > > > > > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote: > > > > > Hi John, > > > > > > > > > > I've made a pass over the KIP and I think it will be a good addition. > > > > > > > > > > Modulo Jason's question, I'm a +1 (binding). > > > > > > > > > > Thanks, > > > > > Bill > > > > > > > > > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson > > > > wrote: > > > > > > > > > > > Hi John, > > > > > > > > > > > > Just one question. It wasn't very clear to me exactly when the > > > > > > metadata > > > > > > would be returned in `ConsumerRecords`. Would we /always/ include > > > > > > the > > > > > > metadata for all partitions that are assigned, or would it be based > > > > > > on > > > > the > > > > > > latest fetches? > > > > > > > > > > > > Thanks, > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Sorry that I am late to the game. +1 (binding) We always knew about this gap when we did KIP-353, and thus, I am glad that we finally address it. -Matthias On 12/18/20 10:16 AM, John Roesler wrote: > Thanks for your question, Ismael, > > Are you concerned about the consumer performance, streams performance or both? > > On the consumer side, this is only creating one extra struct for each > response partition to represent the metadata that we already have access to > internally. I don’t think this would have a measurable performance impact. > > On the streams side, I would definitely like to ensure that performance > doesn’t decrease for users. I ran our internal benchmarks on my POC branch > and found that the measured throughput across all operations is within the > 99% confidence interval of the baseline performance of trunk. I also deployed > our internal soak test from my POC branch, which includes a join operation, > and I observe that the throughput of that soak cluster is identical to the > soak for trunk. > > This result is to be expected, since the semantics improve the here would > only kick in for Join/Merge operations where Streams is processing faster > than it can fetch some partitions on average. I would expect Streams to catch > up to the fetches occasionally, but not on average. > > It’s also worth noting that we have seen increasing numbers of users > complaining of incorrect join results due to the current implementation. Even > if the new implementation showed a modest drop in performance, I would > advocate for correct results over top performance by default. > > Finally, to assuage any lingering concerns, there is a configuration > available to completely disable the new semantics proposed here and revert to > the prior behavior. > > These details seem worth mentioning in the KIP. I’ll update the document > shortly. > > Thanks again, > John > > On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote: >> Hi John, >> >> It would be good to make sure these changes have no measurable performance >> impact for the use cases that don't need it. Have we given this some >> thought? And what would be the perf testing strategy to verify this? >> >> Ismael >> >> On Fri, Dec 18, 2020 at 8:39 AM John Roesler wrote: >> >>> Thanks for the votes and reviews, all. I'll wait for a >>> response from Jason before closing the vote, since he asked >>> for clarification. >>> >>> The present count is: >>> * 3 binding +1 (Guozhang, Bill, and myself) >>> * 2 non-binding +1 (Bruno and Walker) >>> >>> I have updated the KIP document in response to the requests >>> for clarification: >>> 1) The new metadata() map actually just contains immutable >>> Metadata objects representing the metadata received in the >>> last round of fetch responses, so I decided to stick with >>> `receivedMetadata`, as that is an accurate representation of >>> the timestamp's meaning. >>> >>> 2) I added a javadoc clarifying that the metadata partitions >>> may be a superset of the data partitions in the same >>> ConsumerRecords >>> >>> 3) I confirmed that the position we are returning is the >>> next offset to fetch after the current returned records. >>> This is equivalent to the "current position" of the consumer >>> after the call to poll() that returns this ConsumerRecords >>> object >>> >>> 4) (Jason's question about whether we include metadata for >>> all partitions or just the latest fetch responses) I've >>> clarified the javadoc to state that the metadata is only >>> what was included in the latest fetches. >>> >>> Thanks, >>> -John >>> >>> >>> On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote: Hi John, I've made a pass over the KIP and I think it will be a good addition. Modulo Jason's question, I'm a +1 (binding). Thanks, Bill On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson >>> wrote: > Hi John, > > Just one question. It wasn't very clear to me exactly when the metadata > would be returned in `ConsumerRecords`. Would we /always/ include the > metadata for all partitions that are assigned, or would it be based on >>> the > latest fetches? > > Thanks, > Jason > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler >>> wrote: > >> Thanks, Guozhang! >> >> All of your feedback sounds good to me. I’ll update the KIP when I am > able. >> >> 3) I believe it is the position after the fetch, but I will confirm. >>> I >> think omitting position may render beginning and end offsets useless >>> as >> well, which leaves only lag. That would be fine with me, but it also > seems >> nice to supply this extra metadata since it is well defined and >>> probably >> handy for others. Therefore, I’d go the route of specifying the exact >> semantics and keeping it. >> >> Thanks for the review, >> John >> >> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wro
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for your question, Ismael, Are you concerned about the consumer performance, streams performance or both? On the consumer side, this is only creating one extra struct for each response partition to represent the metadata that we already have access to internally. I don’t think this would have a measurable performance impact. On the streams side, I would definitely like to ensure that performance doesn’t decrease for users. I ran our internal benchmarks on my POC branch and found that the measured throughput across all operations is within the 99% confidence interval of the baseline performance of trunk. I also deployed our internal soak test from my POC branch, which includes a join operation, and I observe that the throughput of that soak cluster is identical to the soak for trunk. This result is to be expected, since the semantics improve the here would only kick in for Join/Merge operations where Streams is processing faster than it can fetch some partitions on average. I would expect Streams to catch up to the fetches occasionally, but not on average. It’s also worth noting that we have seen increasing numbers of users complaining of incorrect join results due to the current implementation. Even if the new implementation showed a modest drop in performance, I would advocate for correct results over top performance by default. Finally, to assuage any lingering concerns, there is a configuration available to completely disable the new semantics proposed here and revert to the prior behavior. These details seem worth mentioning in the KIP. I’ll update the document shortly. Thanks again, John On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote: > Hi John, > > It would be good to make sure these changes have no measurable performance > impact for the use cases that don't need it. Have we given this some > thought? And what would be the perf testing strategy to verify this? > > Ismael > > On Fri, Dec 18, 2020 at 8:39 AM John Roesler wrote: > > > Thanks for the votes and reviews, all. I'll wait for a > > response from Jason before closing the vote, since he asked > > for clarification. > > > > The present count is: > > * 3 binding +1 (Guozhang, Bill, and myself) > > * 2 non-binding +1 (Bruno and Walker) > > > > I have updated the KIP document in response to the requests > > for clarification: > > 1) The new metadata() map actually just contains immutable > > Metadata objects representing the metadata received in the > > last round of fetch responses, so I decided to stick with > > `receivedMetadata`, as that is an accurate representation of > > the timestamp's meaning. > > > > 2) I added a javadoc clarifying that the metadata partitions > > may be a superset of the data partitions in the same > > ConsumerRecords > > > > 3) I confirmed that the position we are returning is the > > next offset to fetch after the current returned records. > > This is equivalent to the "current position" of the consumer > > after the call to poll() that returns this ConsumerRecords > > object > > > > 4) (Jason's question about whether we include metadata for > > all partitions or just the latest fetch responses) I've > > clarified the javadoc to state that the metadata is only > > what was included in the latest fetches. > > > > Thanks, > > -John > > > > > > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote: > > > Hi John, > > > > > > I've made a pass over the KIP and I think it will be a good addition. > > > > > > Modulo Jason's question, I'm a +1 (binding). > > > > > > Thanks, > > > Bill > > > > > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson > > wrote: > > > > > > > Hi John, > > > > > > > > Just one question. It wasn't very clear to me exactly when the metadata > > > > would be returned in `ConsumerRecords`. Would we /always/ include the > > > > metadata for all partitions that are assigned, or would it be based on > > the > > > > latest fetches? > > > > > > > > Thanks, > > > > Jason > > > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler > > wrote: > > > > > > > > > Thanks, Guozhang! > > > > > > > > > > All of your feedback sounds good to me. I’ll update the KIP when I am > > > > able. > > > > > > > > > > 3) I believe it is the position after the fetch, but I will confirm. > > I > > > > > think omitting position may render beginning and end offsets useless > > as > > > > > well, which leaves only lag. That would be fine with me, but it also > > > > seems > > > > > nice to supply this extra metadata since it is well defined and > > probably > > > > > handy for others. Therefore, I’d go the route of specifying the exact > > > > > semantics and keeping it. > > > > > > > > > > Thanks for the review, > > > > > John > > > > > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > > > > Hello John, > > > > > > > > > > > > Thanks for the updates! I've made a pass on the KIP and also the > > POC > > > > PR, > > > > > > here are some minor comments: > > > > > > > > > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hi John, It would be good to make sure these changes have no measurable performance impact for the use cases that don't need it. Have we given this some thought? And what would be the perf testing strategy to verify this? Ismael On Fri, Dec 18, 2020 at 8:39 AM John Roesler wrote: > Thanks for the votes and reviews, all. I'll wait for a > response from Jason before closing the vote, since he asked > for clarification. > > The present count is: > * 3 binding +1 (Guozhang, Bill, and myself) > * 2 non-binding +1 (Bruno and Walker) > > I have updated the KIP document in response to the requests > for clarification: > 1) The new metadata() map actually just contains immutable > Metadata objects representing the metadata received in the > last round of fetch responses, so I decided to stick with > `receivedMetadata`, as that is an accurate representation of > the timestamp's meaning. > > 2) I added a javadoc clarifying that the metadata partitions > may be a superset of the data partitions in the same > ConsumerRecords > > 3) I confirmed that the position we are returning is the > next offset to fetch after the current returned records. > This is equivalent to the "current position" of the consumer > after the call to poll() that returns this ConsumerRecords > object > > 4) (Jason's question about whether we include metadata for > all partitions or just the latest fetch responses) I've > clarified the javadoc to state that the metadata is only > what was included in the latest fetches. > > Thanks, > -John > > > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote: > > Hi John, > > > > I've made a pass over the KIP and I think it will be a good addition. > > > > Modulo Jason's question, I'm a +1 (binding). > > > > Thanks, > > Bill > > > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson > wrote: > > > > > Hi John, > > > > > > Just one question. It wasn't very clear to me exactly when the metadata > > > would be returned in `ConsumerRecords`. Would we /always/ include the > > > metadata for all partitions that are assigned, or would it be based on > the > > > latest fetches? > > > > > > Thanks, > > > Jason > > > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler > wrote: > > > > > > > Thanks, Guozhang! > > > > > > > > All of your feedback sounds good to me. I’ll update the KIP when I am > > > able. > > > > > > > > 3) I believe it is the position after the fetch, but I will confirm. > I > > > > think omitting position may render beginning and end offsets useless > as > > > > well, which leaves only lag. That would be fine with me, but it also > > > seems > > > > nice to supply this extra metadata since it is well defined and > probably > > > > handy for others. Therefore, I’d go the route of specifying the exact > > > > semantics and keeping it. > > > > > > > > Thanks for the review, > > > > John > > > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > > > Hello John, > > > > > > > > > > Thanks for the updates! I've made a pass on the KIP and also the > POC > > > PR, > > > > > here are some minor comments: > > > > > > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > > > > updated, > > > > > and we do not create a new object but just update the values > in-place, > > > so > > > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > > > > > 2) It will be great to verify in javadocs that the new API > > > > > "ConsumerRecords#metadata(): Map" may > return > > > a > > > > > superset of TopicPartitions than the existing API that returns the > data > > > > by > > > > > partitions, in case users assume their map key-entries would > always be > > > > the > > > > > same. > > > > > > > > > > 3) The "position()" API of the call needs better clarification: is > it > > > the > > > > > current position AFTER the records are returned, or is it BEFORE > the > > > > > records are returned? Personally I'd suggest we do not include it > if it > > > > is > > > > > not used anywhere yet just to avoid possible misuage, but I'm fine > if > > > you > > > > > like to keep it still; in that case just clarify its semantics. > > > > > > > > > > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson < > wcarl...@confluent.io> > > > > > wrote: > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > walker > > > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna < > br...@confluent.io> > > > > wrote: > > > > > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > > > Best, > > > > > > > Bruno > > > > > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > > > > Hello all, > > > > > > > > > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > > > > > > like to go ahead and call for a vote. > > > > > > > > > > > > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for the votes and reviews, all. I'll wait for a response from Jason before closing the vote, since he asked for clarification. The present count is: * 3 binding +1 (Guozhang, Bill, and myself) * 2 non-binding +1 (Bruno and Walker) I have updated the KIP document in response to the requests for clarification: 1) The new metadata() map actually just contains immutable Metadata objects representing the metadata received in the last round of fetch responses, so I decided to stick with `receivedMetadata`, as that is an accurate representation of the timestamp's meaning. 2) I added a javadoc clarifying that the metadata partitions may be a superset of the data partitions in the same ConsumerRecords 3) I confirmed that the position we are returning is the next offset to fetch after the current returned records. This is equivalent to the "current position" of the consumer after the call to poll() that returns this ConsumerRecords object 4) (Jason's question about whether we include metadata for all partitions or just the latest fetch responses) I've clarified the javadoc to state that the metadata is only what was included in the latest fetches. Thanks, -John On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote: > Hi John, > > I've made a pass over the KIP and I think it will be a good addition. > > Modulo Jason's question, I'm a +1 (binding). > > Thanks, > Bill > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson wrote: > > > Hi John, > > > > Just one question. It wasn't very clear to me exactly when the metadata > > would be returned in `ConsumerRecords`. Would we /always/ include the > > metadata for all partitions that are assigned, or would it be based on the > > latest fetches? > > > > Thanks, > > Jason > > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler wrote: > > > > > Thanks, Guozhang! > > > > > > All of your feedback sounds good to me. I’ll update the KIP when I am > > able. > > > > > > 3) I believe it is the position after the fetch, but I will confirm. I > > > think omitting position may render beginning and end offsets useless as > > > well, which leaves only lag. That would be fine with me, but it also > > seems > > > nice to supply this extra metadata since it is well defined and probably > > > handy for others. Therefore, I’d go the route of specifying the exact > > > semantics and keeping it. > > > > > > Thanks for the review, > > > John > > > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > > Hello John, > > > > > > > > Thanks for the updates! I've made a pass on the KIP and also the POC > > PR, > > > > here are some minor comments: > > > > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > > > updated, > > > > and we do not create a new object but just update the values in-place, > > so > > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > > > 2) It will be great to verify in javadocs that the new API > > > > "ConsumerRecords#metadata(): Map" may return > > a > > > > superset of TopicPartitions than the existing API that returns the data > > > by > > > > partitions, in case users assume their map key-entries would always be > > > the > > > > same. > > > > > > > > 3) The "position()" API of the call needs better clarification: is it > > the > > > > current position AFTER the records are returned, or is it BEFORE the > > > > records are returned? Personally I'd suggest we do not include it if it > > > is > > > > not used anywhere yet just to avoid possible misuage, but I'm fine if > > you > > > > like to keep it still; in that case just clarify its semantics. > > > > > > > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson > > > > wrote: > > > > > > > > > Thanks for the KIP! > > > > > > > > > > +1 (non-binding) > > > > > > > > > > walker > > > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna > > > wrote: > > > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > > > +1 (non-binding) > > > > > > > > > > > > Best, > > > > > > Bruno > > > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > > > Hello all, > > > > > > > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > > > > > like to go ahead and call for a vote. > > > > > > > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > > > > > "task idling" feature we introduced in KIP-353. This KIP > > > > > > > will allow Streams to offer deterministic time semantics in > > > > > > > join-type topologies. For example, it makes sure that > > > > > > > when you join two topics, that we collate the topics by > > > > > > > timestamp. That was always the intent with task idling (KIP- > > > > > > > 353), but it turns out the previous mechanism couldn't > > > > > > > provide the desired semantics. > > > > > > > > > > > > > > The details are here: > > > > > > > http
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks Jason, We would only return the metadata for the latest fetches. So, if someone wanted to use this to lazily maintain a client-side metadata map for all partitions, they'd have to store it separately and merge in new updates as they arrive. This way: 1. We don't need to increase the complexity of the client by storing that metadata 2. Users will be able to treat all returned metadata as "fresh" without having to reason about the timestamps. 3. All parts of the returned ConsumerRecords object have the same lifecycle: all the data and metadata are the results of the most recent round of fetch responses that had not been previously polled. Does that seem sensible to you? I'll update the KIP to clarify this. Thanks, -John On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: > Hi John, > > Just one question. It wasn't very clear to me exactly when the metadata > would be returned in `ConsumerRecords`. Would we /always/ include the > metadata for all partitions that are assigned, or would it be based on the > latest fetches? > > Thanks, > Jason > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler wrote: > > > Thanks, Guozhang! > > > > All of your feedback sounds good to me. I’ll update the KIP when I am able. > > > > 3) I believe it is the position after the fetch, but I will confirm. I > > think omitting position may render beginning and end offsets useless as > > well, which leaves only lag. That would be fine with me, but it also seems > > nice to supply this extra metadata since it is well defined and probably > > handy for others. Therefore, I’d go the route of specifying the exact > > semantics and keeping it. > > > > Thanks for the review, > > John > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > Hello John, > > > > > > Thanks for the updates! I've made a pass on the KIP and also the POC PR, > > > here are some minor comments: > > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > > updated, > > > and we do not create a new object but just update the values in-place, so > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > 2) It will be great to verify in javadocs that the new API > > > "ConsumerRecords#metadata(): Map" may return a > > > superset of TopicPartitions than the existing API that returns the data > > by > > > partitions, in case users assume their map key-entries would always be > > the > > > same. > > > > > > 3) The "position()" API of the call needs better clarification: is it the > > > current position AFTER the records are returned, or is it BEFORE the > > > records are returned? Personally I'd suggest we do not include it if it > > is > > > not used anywhere yet just to avoid possible misuage, but I'm fine if you > > > like to keep it still; in that case just clarify its semantics. > > > > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > > > > Guozhang > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson > > > wrote: > > > > > > > Thanks for the KIP! > > > > > > > > +1 (non-binding) > > > > > > > > walker > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna > > wrote: > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > +1 (non-binding) > > > > > > > > > > Best, > > > > > Bruno > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > > Hello all, > > > > > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > > > > like to go ahead and call for a vote. > > > > > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > > > > "task idling" feature we introduced in KIP-353. This KIP > > > > > > will allow Streams to offer deterministic time semantics in > > > > > > join-type topologies. For example, it makes sure that > > > > > > when you join two topics, that we collate the topics by > > > > > > timestamp. That was always the intent with task idling (KIP- > > > > > > 353), but it turns out the previous mechanism couldn't > > > > > > provide the desired semantics. > > > > > > > > > > > > The details are here: > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hi John, I've made a pass over the KIP and I think it will be a good addition. Modulo Jason's question, I'm a +1 (binding). Thanks, Bill On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson wrote: > Hi John, > > Just one question. It wasn't very clear to me exactly when the metadata > would be returned in `ConsumerRecords`. Would we /always/ include the > metadata for all partitions that are assigned, or would it be based on the > latest fetches? > > Thanks, > Jason > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler wrote: > > > Thanks, Guozhang! > > > > All of your feedback sounds good to me. I’ll update the KIP when I am > able. > > > > 3) I believe it is the position after the fetch, but I will confirm. I > > think omitting position may render beginning and end offsets useless as > > well, which leaves only lag. That would be fine with me, but it also > seems > > nice to supply this extra metadata since it is well defined and probably > > handy for others. Therefore, I’d go the route of specifying the exact > > semantics and keeping it. > > > > Thanks for the review, > > John > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > > Hello John, > > > > > > Thanks for the updates! I've made a pass on the KIP and also the POC > PR, > > > here are some minor comments: > > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > > updated, > > > and we do not create a new object but just update the values in-place, > so > > > maybe calling it `lastUpdateTimstamp` is better? > > > > > > 2) It will be great to verify in javadocs that the new API > > > "ConsumerRecords#metadata(): Map" may return > a > > > superset of TopicPartitions than the existing API that returns the data > > by > > > partitions, in case users assume their map key-entries would always be > > the > > > same. > > > > > > 3) The "position()" API of the call needs better clarification: is it > the > > > current position AFTER the records are returned, or is it BEFORE the > > > records are returned? Personally I'd suggest we do not include it if it > > is > > > not used anywhere yet just to avoid possible misuage, but I'm fine if > you > > > like to keep it still; in that case just clarify its semantics. > > > > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > > > > Guozhang > > > > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson > > > wrote: > > > > > > > Thanks for the KIP! > > > > > > > > +1 (non-binding) > > > > > > > > walker > > > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna > > wrote: > > > > > > > > > Thanks for the KIP, John! > > > > > > > > > > +1 (non-binding) > > > > > > > > > > Best, > > > > > Bruno > > > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > > Hello all, > > > > > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > > > > like to go ahead and call for a vote. > > > > > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > > > > "task idling" feature we introduced in KIP-353. This KIP > > > > > > will allow Streams to offer deterministic time semantics in > > > > > > join-type topologies. For example, it makes sure that > > > > > > when you join two topics, that we collate the topics by > > > > > > timestamp. That was always the intent with task idling (KIP- > > > > > > 353), but it turns out the previous mechanism couldn't > > > > > > provide the desired semantics. > > > > > > > > > > > > The details are here: > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > > > > > Thanks, > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hi John, Just one question. It wasn't very clear to me exactly when the metadata would be returned in `ConsumerRecords`. Would we /always/ include the metadata for all partitions that are assigned, or would it be based on the latest fetches? Thanks, Jason On Fri, Dec 11, 2020 at 4:07 PM John Roesler wrote: > Thanks, Guozhang! > > All of your feedback sounds good to me. I’ll update the KIP when I am able. > > 3) I believe it is the position after the fetch, but I will confirm. I > think omitting position may render beginning and end offsets useless as > well, which leaves only lag. That would be fine with me, but it also seems > nice to supply this extra metadata since it is well defined and probably > handy for others. Therefore, I’d go the route of specifying the exact > semantics and keeping it. > > Thanks for the review, > John > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > > Hello John, > > > > Thanks for the updates! I've made a pass on the KIP and also the POC PR, > > here are some minor comments: > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > updated, > > and we do not create a new object but just update the values in-place, so > > maybe calling it `lastUpdateTimstamp` is better? > > > > 2) It will be great to verify in javadocs that the new API > > "ConsumerRecords#metadata(): Map" may return a > > superset of TopicPartitions than the existing API that returns the data > by > > partitions, in case users assume their map key-entries would always be > the > > same. > > > > 3) The "position()" API of the call needs better clarification: is it the > > current position AFTER the records are returned, or is it BEFORE the > > records are returned? Personally I'd suggest we do not include it if it > is > > not used anywhere yet just to avoid possible misuage, but I'm fine if you > > like to keep it still; in that case just clarify its semantics. > > > > > > Other than that,I'm +1 on the KIP as well ! > > > > > > Guozhang > > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson > > wrote: > > > > > Thanks for the KIP! > > > > > > +1 (non-binding) > > > > > > walker > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna > wrote: > > > > > > > Thanks for the KIP, John! > > > > > > > > +1 (non-binding) > > > > > > > > Best, > > > > Bruno > > > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > > Hello all, > > > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > > > like to go ahead and call for a vote. > > > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > > > "task idling" feature we introduced in KIP-353. This KIP > > > > > will allow Streams to offer deterministic time semantics in > > > > > join-type topologies. For example, it makes sure that > > > > > when you join two topics, that we collate the topics by > > > > > timestamp. That was always the intent with task idling (KIP- > > > > > 353), but it turns out the previous mechanism couldn't > > > > > provide the desired semantics. > > > > > > > > > > The details are here: > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > > > Thanks, > > > > > -John > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks, Guozhang! All of your feedback sounds good to me. I’ll update the KIP when I am able. 3) I believe it is the position after the fetch, but I will confirm. I think omitting position may render beginning and end offsets useless as well, which leaves only lag. That would be fine with me, but it also seems nice to supply this extra metadata since it is well defined and probably handy for others. Therefore, I’d go the route of specifying the exact semantics and keeping it. Thanks for the review, John On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > Hello John, > > Thanks for the updates! I've made a pass on the KIP and also the POC PR, > here are some minor comments: > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting updated, > and we do not create a new object but just update the values in-place, so > maybe calling it `lastUpdateTimstamp` is better? > > 2) It will be great to verify in javadocs that the new API > "ConsumerRecords#metadata(): Map" may return a > superset of TopicPartitions than the existing API that returns the data by > partitions, in case users assume their map key-entries would always be the > same. > > 3) The "position()" API of the call needs better clarification: is it the > current position AFTER the records are returned, or is it BEFORE the > records are returned? Personally I'd suggest we do not include it if it is > not used anywhere yet just to avoid possible misuage, but I'm fine if you > like to keep it still; in that case just clarify its semantics. > > > Other than that,I'm +1 on the KIP as well ! > > > Guozhang > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson > wrote: > > > Thanks for the KIP! > > > > +1 (non-binding) > > > > walker > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna wrote: > > > > > Thanks for the KIP, John! > > > > > > +1 (non-binding) > > > > > > Best, > > > Bruno > > > > > > On 08.12.20 18:03, John Roesler wrote: > > > > Hello all, > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > > like to go ahead and call for a vote. > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > > "task idling" feature we introduced in KIP-353. This KIP > > > > will allow Streams to offer deterministic time semantics in > > > > join-type topologies. For example, it makes sure that > > > > when you join two topics, that we collate the topics by > > > > timestamp. That was always the intent with task idling (KIP- > > > > 353), but it turns out the previous mechanism couldn't > > > > provide the desired semantics. > > > > > > > > The details are here: > > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > > > Thanks, > > > > -John > > > > > > > > > > > > -- > -- Guozhang >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Hello John, Thanks for the updates! I've made a pass on the KIP and also the POC PR, here are some minor comments: 1) nit: "receivedTimestamp" -> it seems the metadata keep getting updated, and we do not create a new object but just update the values in-place, so maybe calling it `lastUpdateTimstamp` is better? 2) It will be great to verify in javadocs that the new API "ConsumerRecords#metadata(): Map" may return a superset of TopicPartitions than the existing API that returns the data by partitions, in case users assume their map key-entries would always be the same. 3) The "position()" API of the call needs better clarification: is it the current position AFTER the records are returned, or is it BEFORE the records are returned? Personally I'd suggest we do not include it if it is not used anywhere yet just to avoid possible misuage, but I'm fine if you like to keep it still; in that case just clarify its semantics. Other than that,I'm +1 on the KIP as well ! Guozhang On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson wrote: > Thanks for the KIP! > > +1 (non-binding) > > walker > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna wrote: > > > Thanks for the KIP, John! > > > > +1 (non-binding) > > > > Best, > > Bruno > > > > On 08.12.20 18:03, John Roesler wrote: > > > Hello all, > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > > like to go ahead and call for a vote. > > > > > > As a reminder, the purpose of KIP-695 to improve on the > > > "task idling" feature we introduced in KIP-353. This KIP > > > will allow Streams to offer deterministic time semantics in > > > join-type topologies. For example, it makes sure that > > > when you join two topics, that we collate the topics by > > > timestamp. That was always the intent with task idling (KIP- > > > 353), but it turns out the previous mechanism couldn't > > > provide the desired semantics. > > > > > > The details are here: > > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > > > Thanks, > > > -John > > > > > > -- -- Guozhang
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for the KIP! +1 (non-binding) walker On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna wrote: > Thanks for the KIP, John! > > +1 (non-binding) > > Best, > Bruno > > On 08.12.20 18:03, John Roesler wrote: > > Hello all, > > > > There hasn't been much discussion on KIP-695 so far, so I'd > > like to go ahead and call for a vote. > > > > As a reminder, the purpose of KIP-695 to improve on the > > "task idling" feature we introduced in KIP-353. This KIP > > will allow Streams to offer deterministic time semantics in > > join-type topologies. For example, it makes sure that > > when you join two topics, that we collate the topics by > > timestamp. That was always the intent with task idling (KIP- > > 353), but it turns out the previous mechanism couldn't > > provide the desired semantics. > > > > The details are here: > > https://cwiki.apache.org/confluence/x/JSXZCQ > > > > Thanks, > > -John > > >
Re: [VOTE] KIP-695: Improve Streams Time Synchronization
Thanks for the KIP, John! +1 (non-binding) Best, Bruno On 08.12.20 18:03, John Roesler wrote: Hello all, There hasn't been much discussion on KIP-695 so far, so I'd like to go ahead and call for a vote. As a reminder, the purpose of KIP-695 to improve on the "task idling" feature we introduced in KIP-353. This KIP will allow Streams to offer deterministic time semantics in join-type topologies. For example, it makes sure that when you join two topics, that we collate the topics by timestamp. That was always the intent with task idling (KIP- 353), but it turns out the previous mechanism couldn't provide the desired semantics. The details are here: https://cwiki.apache.org/confluence/x/JSXZCQ Thanks, -John