Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-22 Thread Guozhang Wang
Hello John,

Thanks for the new PR, and sorry for zig-zagging this KIP design. I made a
pass on the PR and it looks good to me overall. Left some minor comments.


Guozhang

On Mon, Feb 22, 2021 at 9:06 AM John Roesler  wrote:

> Hello all,
>
> I have updated the KIP now. The diff is visible here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=165225765&selectedPageVersions=13&selectedPageVersions=12
>
> The PR https://github.com/apache/kafka/pull/10137 is now
> available for reviews.
>
> Thanks so much to you all for your help on this!
> John
>
> On Mon, 2021-02-22 at 10:25 -0600, John Roesler wrote:
> > Thanks, Bill,
> >
> > I was waiting for feedback on the "currentLag" proposal
> > before updating the KIP. Since there haven't been any
> > objections to my new proposal, I'm in the process of
> > updating the KIP document right now.
> >
> > Personally, I still like the original proposal to expose the
> > metadata from the fetch responses as well, but there were
> > too many side-effects of going that route.
> >
> > Once I finish updating the wiki page, I'll ping here again
> > to check for objections.
> >
> > Thank you for taking another look!
> > -John
> >
> > On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote:
> > > Thanks for the KIP update John,
> > >
> > > The KIP as it stands LGTM.
> > >
> > > One question, in your previous comment you stated that the KIP would
> > > introduce the `currentLag()` method,  but the KIP seems to show the
> > > `metadata()` implementation.
> > > FWIW I like the metadata approach as it seems to provide information
> the
> > > consumer has access to when it receives the fetch response.
> > >
> > > Thanks again.
> > >
> > > Bill
> > >
> > > On Mon, Feb 22, 2021 at 10:51 AM John Roesler 
> wrote:
> > >
> > > > Hello all,
> > > >
> > > > Since there haven't been any big objections to my proposed
> > > > design fix, I'm updating the KIP now and opening the PR for
> > > > reviews. Hopefully, we can get this merged quickly and
> > > > unblock the system tests.
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> > > > > Thanks for that idea, Chia-Ping,
> > > > >
> > > > > I agree that it would be nice for users to have a single API
> > > > > to get all kinds of metadata, but I'd rather introduce it in
> > > > > a KIP that would expose more than one piece of metadata. At
> > > > > the moment, I am motivated to keep this as simple as
> > > > > possible, which means only exposing the lag, so it seems
> > > > > better to introduce just "currentLag".
> > > > >
> > > > > A later KIP could introduce some kind of "local metadata"
> > > > > API and deprecate this method without harm. That later KIP
> > > > > would be in a better position to take its time to find a
> > > > > good name, consider which pieces of metadata would be nice
> > > > > to have, etc.
> > > > >
> > > > > Thanks sincerely for all your reviews,
> > > > > John
> > > > >
> > > > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote:
> > > > > > That new solution is good to me as it brings fewer changes than
> either
> > > > options or config.
> > > > > >
> > > > > > one minor question - Could we return a composite object rather
> than
> > > > OptionalLong? For example:
> > > > > >
> > > > > > class Metadata {
> > > > > >   long lag;
> > > > > > }
> > > > > >
> > > > > > There are a bunch of 'metadata' for a partition and it is
> possible we
> > > > want to expose more information of a partition in the future. The
> composite
> > > > object open a room to carry more information without adding more
> Public
> > > > APIs to Consumer.
> > > > > >
> > > > > > On 2021/02/17 05:21:06, John Roesler 
> wrote:
> > > > > > > Hello again, all,
> > > > > > >
> > > > > > > Thank you for your feedback and patience. I am hopeful that
> > > > > > > I have been able to come up with a solution that will
> > > > > > > satisfy everyone.
> > > > > > >
> > > > > > > Under a previous design iteration of the desired task idling
> > > > > > > semantics in KIP-695, we did indeed require the behavior
> > > > > > > change, but while I was considering all of your feedback, I
> > > > > > > realized that that requirement is no longer present.
> > > > > > >
> > > > > > > Just a little more detail in case you are curious: I had
> > > > > > > initially wanted to make the task idling semantics
> > > > > > > absolutely free of weird timing effects based on how
> > > > > > > frequently Streams happens to call poll, so I built in a
> > > > > > > mechanism that would force Streams to get back a _fresh_ lag
> > > > > > > reponse from poll before proceeding to enforce processing.
> > > > > > > However, this resulted in a severe performance degradation,
> > > > > > > so I backed off to use a cache of the lag metadata.
> > > > > > >
> > > > > > > What I realized just now is that under this change, there's
> > > > > > > no longer a need to return metadata (or change beha

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-22 Thread John Roesler
Hello all,

I have updated the KIP now. The diff is visible here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=165225765&selectedPageVersions=13&selectedPageVersions=12

The PR https://github.com/apache/kafka/pull/10137 is now
available for reviews.

Thanks so much to you all for your help on this!
John

On Mon, 2021-02-22 at 10:25 -0600, John Roesler wrote:
> Thanks, Bill,
> 
> I was waiting for feedback on the "currentLag" proposal
> before updating the KIP. Since there haven't been any
> objections to my new proposal, I'm in the process of
> updating the KIP document right now.
> 
> Personally, I still like the original proposal to expose the
> metadata from the fetch responses as well, but there were
> too many side-effects of going that route.
> 
> Once I finish updating the wiki page, I'll ping here again
> to check for objections.
> 
> Thank you for taking another look!
> -John
> 
> On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote:
> > Thanks for the KIP update John,
> > 
> > The KIP as it stands LGTM.
> > 
> > One question, in your previous comment you stated that the KIP would
> > introduce the `currentLag()` method,  but the KIP seems to show the
> > `metadata()` implementation.
> > FWIW I like the metadata approach as it seems to provide information the
> > consumer has access to when it receives the fetch response.
> > 
> > Thanks again.
> > 
> > Bill
> > 
> > On Mon, Feb 22, 2021 at 10:51 AM John Roesler  wrote:
> > 
> > > Hello all,
> > > 
> > > Since there haven't been any big objections to my proposed
> > > design fix, I'm updating the KIP now and opening the PR for
> > > reviews. Hopefully, we can get this merged quickly and
> > > unblock the system tests.
> > > 
> > > Thanks,
> > > John
> > > 
> > > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> > > > Thanks for that idea, Chia-Ping,
> > > > 
> > > > I agree that it would be nice for users to have a single API
> > > > to get all kinds of metadata, but I'd rather introduce it in
> > > > a KIP that would expose more than one piece of metadata. At
> > > > the moment, I am motivated to keep this as simple as
> > > > possible, which means only exposing the lag, so it seems
> > > > better to introduce just "currentLag".
> > > > 
> > > > A later KIP could introduce some kind of "local metadata"
> > > > API and deprecate this method without harm. That later KIP
> > > > would be in a better position to take its time to find a
> > > > good name, consider which pieces of metadata would be nice
> > > > to have, etc.
> > > > 
> > > > Thanks sincerely for all your reviews,
> > > > John
> > > > 
> > > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote:
> > > > > That new solution is good to me as it brings fewer changes than either
> > > options or config.
> > > > > 
> > > > > one minor question - Could we return a composite object rather than
> > > OptionalLong? For example:
> > > > > 
> > > > > class Metadata {
> > > > >   long lag;
> > > > > }
> > > > > 
> > > > > There are a bunch of 'metadata' for a partition and it is possible we
> > > want to expose more information of a partition in the future. The 
> > > composite
> > > object open a room to carry more information without adding more Public
> > > APIs to Consumer.
> > > > > 
> > > > > On 2021/02/17 05:21:06, John Roesler  wrote:
> > > > > > Hello again, all,
> > > > > > 
> > > > > > Thank you for your feedback and patience. I am hopeful that
> > > > > > I have been able to come up with a solution that will
> > > > > > satisfy everyone.
> > > > > > 
> > > > > > Under a previous design iteration of the desired task idling
> > > > > > semantics in KIP-695, we did indeed require the behavior
> > > > > > change, but while I was considering all of your feedback, I
> > > > > > realized that that requirement is no longer present.
> > > > > > 
> > > > > > Just a little more detail in case you are curious: I had
> > > > > > initially wanted to make the task idling semantics
> > > > > > absolutely free of weird timing effects based on how
> > > > > > frequently Streams happens to call poll, so I built in a
> > > > > > mechanism that would force Streams to get back a _fresh_ lag
> > > > > > reponse from poll before proceeding to enforce processing.
> > > > > > However, this resulted in a severe performance degradation,
> > > > > > so I backed off to use a cache of the lag metadata.
> > > > > > 
> > > > > > What I realized just now is that under this change, there's
> > > > > > no longer a need to return metadata (or change behavior) in
> > > > > > Consumer#poll at all. The lag cache in Streams would always
> > > > > > be identical to the one inside the Consumer's
> > > > > > SubscriptionState. Therefore, I can instead just expose the
> > > > > > Consumer's lag in a new API. Here is what I propose:
> > > > > > 
> > > > > > /**
> > > > > >  * Get the consumer's current lag on the partition. Returns
> > > > > > an "empty" {@link OptionalLong} if the lag is no

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-22 Thread John Roesler
Thanks, Bill,

I was waiting for feedback on the "currentLag" proposal
before updating the KIP. Since there haven't been any
objections to my new proposal, I'm in the process of
updating the KIP document right now.

Personally, I still like the original proposal to expose the
metadata from the fetch responses as well, but there were
too many side-effects of going that route.

Once I finish updating the wiki page, I'll ping here again
to check for objections.

Thank you for taking another look!
-John

On Mon, 2021-02-22 at 11:19 -0500, Bill Bejeck wrote:
> Thanks for the KIP update John,
> 
> The KIP as it stands LGTM.
> 
> One question, in your previous comment you stated that the KIP would
> introduce the `currentLag()` method,  but the KIP seems to show the
> `metadata()` implementation.
> FWIW I like the metadata approach as it seems to provide information the
> consumer has access to when it receives the fetch response.
> 
> Thanks again.
> 
> Bill
> 
> On Mon, Feb 22, 2021 at 10:51 AM John Roesler  wrote:
> 
> > Hello all,
> > 
> > Since there haven't been any big objections to my proposed
> > design fix, I'm updating the KIP now and opening the PR for
> > reviews. Hopefully, we can get this merged quickly and
> > unblock the system tests.
> > 
> > Thanks,
> > John
> > 
> > On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> > > Thanks for that idea, Chia-Ping,
> > > 
> > > I agree that it would be nice for users to have a single API
> > > to get all kinds of metadata, but I'd rather introduce it in
> > > a KIP that would expose more than one piece of metadata. At
> > > the moment, I am motivated to keep this as simple as
> > > possible, which means only exposing the lag, so it seems
> > > better to introduce just "currentLag".
> > > 
> > > A later KIP could introduce some kind of "local metadata"
> > > API and deprecate this method without harm. That later KIP
> > > would be in a better position to take its time to find a
> > > good name, consider which pieces of metadata would be nice
> > > to have, etc.
> > > 
> > > Thanks sincerely for all your reviews,
> > > John
> > > 
> > > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote:
> > > > That new solution is good to me as it brings fewer changes than either
> > options or config.
> > > > 
> > > > one minor question - Could we return a composite object rather than
> > OptionalLong? For example:
> > > > 
> > > > class Metadata {
> > > >   long lag;
> > > > }
> > > > 
> > > > There are a bunch of 'metadata' for a partition and it is possible we
> > want to expose more information of a partition in the future. The composite
> > object open a room to carry more information without adding more Public
> > APIs to Consumer.
> > > > 
> > > > On 2021/02/17 05:21:06, John Roesler  wrote:
> > > > > Hello again, all,
> > > > > 
> > > > > Thank you for your feedback and patience. I am hopeful that
> > > > > I have been able to come up with a solution that will
> > > > > satisfy everyone.
> > > > > 
> > > > > Under a previous design iteration of the desired task idling
> > > > > semantics in KIP-695, we did indeed require the behavior
> > > > > change, but while I was considering all of your feedback, I
> > > > > realized that that requirement is no longer present.
> > > > > 
> > > > > Just a little more detail in case you are curious: I had
> > > > > initially wanted to make the task idling semantics
> > > > > absolutely free of weird timing effects based on how
> > > > > frequently Streams happens to call poll, so I built in a
> > > > > mechanism that would force Streams to get back a _fresh_ lag
> > > > > reponse from poll before proceeding to enforce processing.
> > > > > However, this resulted in a severe performance degradation,
> > > > > so I backed off to use a cache of the lag metadata.
> > > > > 
> > > > > What I realized just now is that under this change, there's
> > > > > no longer a need to return metadata (or change behavior) in
> > > > > Consumer#poll at all. The lag cache in Streams would always
> > > > > be identical to the one inside the Consumer's
> > > > > SubscriptionState. Therefore, I can instead just expose the
> > > > > Consumer's lag in a new API. Here is what I propose:
> > > > > 
> > > > > /**
> > > > >  * Get the consumer's current lag on the partition. Returns
> > > > > an "empty" {@link OptionalLong} if the lag is not known,
> > > > >  * for example if there is no position yet, or if the end
> > > > > offset is not known yet.
> > > > >  *
> > > > >  * 
> > > > >  * This method uses locally cached metadata and never makes
> > > > > a remote call.
> > > > >  *
> > > > >  * @param topicPartition The partition to get the lag for.
> > > > >  *
> > > > >  * @return This {@code Consumer} instance's current lag for
> > > > > the given partition.
> > > > >  *
> > > > >  * @throws IllegalStateException if the {@code
> > > > > topicPartition} is not assigned
> > > > >  **/
> > > > > @Override
> > > > > public OptionalLong currentLag(

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-22 Thread Bill Bejeck
Thanks for the KIP update John,

The KIP as it stands LGTM.

One question, in your previous comment you stated that the KIP would
introduce the `currentLag()` method,  but the KIP seems to show the
`metadata()` implementation.
FWIW I like the metadata approach as it seems to provide information the
consumer has access to when it receives the fetch response.

Thanks again.

Bill

On Mon, Feb 22, 2021 at 10:51 AM John Roesler  wrote:

> Hello all,
>
> Since there haven't been any big objections to my proposed
> design fix, I'm updating the KIP now and opening the PR for
> reviews. Hopefully, we can get this merged quickly and
> unblock the system tests.
>
> Thanks,
> John
>
> On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> > Thanks for that idea, Chia-Ping,
> >
> > I agree that it would be nice for users to have a single API
> > to get all kinds of metadata, but I'd rather introduce it in
> > a KIP that would expose more than one piece of metadata. At
> > the moment, I am motivated to keep this as simple as
> > possible, which means only exposing the lag, so it seems
> > better to introduce just "currentLag".
> >
> > A later KIP could introduce some kind of "local metadata"
> > API and deprecate this method without harm. That later KIP
> > would be in a better position to take its time to find a
> > good name, consider which pieces of metadata would be nice
> > to have, etc.
> >
> > Thanks sincerely for all your reviews,
> > John
> >
> > On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote:
> > > That new solution is good to me as it brings fewer changes than either
> options or config.
> > >
> > > one minor question - Could we return a composite object rather than
> OptionalLong? For example:
> > >
> > > class Metadata {
> > >   long lag;
> > > }
> > >
> > > There are a bunch of 'metadata' for a partition and it is possible we
> want to expose more information of a partition in the future. The composite
> object open a room to carry more information without adding more Public
> APIs to Consumer.
> > >
> > > On 2021/02/17 05:21:06, John Roesler  wrote:
> > > > Hello again, all,
> > > >
> > > > Thank you for your feedback and patience. I am hopeful that
> > > > I have been able to come up with a solution that will
> > > > satisfy everyone.
> > > >
> > > > Under a previous design iteration of the desired task idling
> > > > semantics in KIP-695, we did indeed require the behavior
> > > > change, but while I was considering all of your feedback, I
> > > > realized that that requirement is no longer present.
> > > >
> > > > Just a little more detail in case you are curious: I had
> > > > initially wanted to make the task idling semantics
> > > > absolutely free of weird timing effects based on how
> > > > frequently Streams happens to call poll, so I built in a
> > > > mechanism that would force Streams to get back a _fresh_ lag
> > > > reponse from poll before proceeding to enforce processing.
> > > > However, this resulted in a severe performance degradation,
> > > > so I backed off to use a cache of the lag metadata.
> > > >
> > > > What I realized just now is that under this change, there's
> > > > no longer a need to return metadata (or change behavior) in
> > > > Consumer#poll at all. The lag cache in Streams would always
> > > > be identical to the one inside the Consumer's
> > > > SubscriptionState. Therefore, I can instead just expose the
> > > > Consumer's lag in a new API. Here is what I propose:
> > > >
> > > > /**
> > > >  * Get the consumer's current lag on the partition. Returns
> > > > an "empty" {@link OptionalLong} if the lag is not known,
> > > >  * for example if there is no position yet, or if the end
> > > > offset is not known yet.
> > > >  *
> > > >  * 
> > > >  * This method uses locally cached metadata and never makes
> > > > a remote call.
> > > >  *
> > > >  * @param topicPartition The partition to get the lag for.
> > > >  *
> > > >  * @return This {@code Consumer} instance's current lag for
> > > > the given partition.
> > > >  *
> > > >  * @throws IllegalStateException if the {@code
> > > > topicPartition} is not assigned
> > > >  **/
> > > > @Override
> > > > public OptionalLong currentLag(
> > > >   TopicPartition topicPartition
> > > > );
> > > >
> > > >
> > > >
> > > > With this new API, we have a handy way to find out the lag
> > > > of the consumer without ever incurring a remote call. There
> > > > is no unnecessarily low-level config option to confuse
> > > > users. And there is no change in the behavior of any
> > > > existinng API to break users' programs.
> > > >
> > > > I have implemented this end-to-end in a preview PR:
> > > > https://github.com/apache/kafka/pull/10137
> > > >
> > > > If this proposal sounds good to all of you, then I will go
> > > > ahead and update the KIP.
> > > >
> > > > Sincerely yours,
> > > > -John
> > > >
> > > > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote:
> > > > > here is my two cents. If the behavior eventually gets

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-22 Thread John Roesler
Hello all,

Since there haven't been any big objections to my proposed
design fix, I'm updating the KIP now and opening the PR for
reviews. Hopefully, we can get this merged quickly and
unblock the system tests.

Thanks,
John

On Mon, 2021-02-22 at 09:49 -0600, John Roesler wrote:
> Thanks for that idea, Chia-Ping,
> 
> I agree that it would be nice for users to have a single API
> to get all kinds of metadata, but I'd rather introduce it in
> a KIP that would expose more than one piece of metadata. At
> the moment, I am motivated to keep this as simple as
> possible, which means only exposing the lag, so it seems
> better to introduce just "currentLag".
> 
> A later KIP could introduce some kind of "local metadata"
> API and deprecate this method without harm. That later KIP
> would be in a better position to take its time to find a
> good name, consider which pieces of metadata would be nice
> to have, etc.
> 
> Thanks sincerely for all your reviews,
> John
> 
> On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote:
> > That new solution is good to me as it brings fewer changes than either 
> > options or config.
> > 
> > one minor question - Could we return a composite object rather than 
> > OptionalLong? For example:
> > 
> > class Metadata {
> >   long lag;
> > }
> > 
> > There are a bunch of 'metadata' for a partition and it is possible we want 
> > to expose more information of a partition in the future. The composite 
> > object open a room to carry more information without adding more Public 
> > APIs to Consumer.
> > 
> > On 2021/02/17 05:21:06, John Roesler  wrote: 
> > > Hello again, all,
> > > 
> > > Thank you for your feedback and patience. I am hopeful that
> > > I have been able to come up with a solution that will
> > > satisfy everyone.
> > > 
> > > Under a previous design iteration of the desired task idling
> > > semantics in KIP-695, we did indeed require the behavior
> > > change, but while I was considering all of your feedback, I
> > > realized that that requirement is no longer present.
> > > 
> > > Just a little more detail in case you are curious: I had
> > > initially wanted to make the task idling semantics
> > > absolutely free of weird timing effects based on how
> > > frequently Streams happens to call poll, so I built in a
> > > mechanism that would force Streams to get back a _fresh_ lag
> > > reponse from poll before proceeding to enforce processing.
> > > However, this resulted in a severe performance degradation,
> > > so I backed off to use a cache of the lag metadata.
> > > 
> > > What I realized just now is that under this change, there's
> > > no longer a need to return metadata (or change behavior) in
> > > Consumer#poll at all. The lag cache in Streams would always
> > > be identical to the one inside the Consumer's
> > > SubscriptionState. Therefore, I can instead just expose the
> > > Consumer's lag in a new API. Here is what I propose:
> > > 
> > > /**
> > >  * Get the consumer's current lag on the partition. Returns
> > > an "empty" {@link OptionalLong} if the lag is not known,
> > >  * for example if there is no position yet, or if the end
> > > offset is not known yet.
> > >  *
> > >  * 
> > >  * This method uses locally cached metadata and never makes
> > > a remote call.
> > >  *
> > >  * @param topicPartition The partition to get the lag for.
> > >  *
> > >  * @return This {@code Consumer} instance's current lag for
> > > the given partition.
> > >  *
> > >  * @throws IllegalStateException if the {@code
> > > topicPartition} is not assigned
> > >  **/
> > > @Override
> > > public OptionalLong currentLag(
> > >   TopicPartition topicPartition
> > > );
> > > 
> > > 
> > > 
> > > With this new API, we have a handy way to find out the lag
> > > of the consumer without ever incurring a remote call. There
> > > is no unnecessarily low-level config option to confuse
> > > users. And there is no change in the behavior of any
> > > existinng API to break users' programs.
> > > 
> > > I have implemented this end-to-end in a preview PR:
> > > https://github.com/apache/kafka/pull/10137
> > > 
> > > If this proposal sounds good to all of you, then I will go
> > > ahead and update the KIP.
> > > 
> > > Sincerely yours,
> > > -John
> > > 
> > > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote:
> > > > here is my two cents. If the behavior eventually gets changed (return 
> > > > on response), the config is more suitable as it is easier to be 
> > > > deprecated (less changes). For example, we can introduce the config in 
> > > > 2.8 and then deprecate it in 2.9. 3.0 removes the config and supports 
> > > > only return-on-response.
> > > > 
> > > > 
> > > > 
> > > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> > > > > Hey John, I know I'm a bit late to this party but just for the record,
> > > > > I don't think it's *totally *unreasonable for a user to take up the 
> > > > > "poll
> > > > > on max timeout and assume some records will be returned" approac

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-22 Thread John Roesler
Thanks for that idea, Chia-Ping,

I agree that it would be nice for users to have a single API
to get all kinds of metadata, but I'd rather introduce it in
a KIP that would expose more than one piece of metadata. At
the moment, I am motivated to keep this as simple as
possible, which means only exposing the lag, so it seems
better to introduce just "currentLag".

A later KIP could introduce some kind of "local metadata"
API and deprecate this method without harm. That later KIP
would be in a better position to take its time to find a
good name, consider which pieces of metadata would be nice
to have, etc.

Thanks sincerely for all your reviews,
John

On Wed, 2021-02-17 at 11:54 +, Chia-Ping Tsai wrote:
> That new solution is good to me as it brings fewer changes than either 
> options or config.
> 
> one minor question - Could we return a composite object rather than 
> OptionalLong? For example:
> 
> class Metadata {
>   long lag;
> }
> 
> There are a bunch of 'metadata' for a partition and it is possible we want to 
> expose more information of a partition in the future. The composite object 
> open a room to carry more information without adding more Public APIs to 
> Consumer.
> 
> On 2021/02/17 05:21:06, John Roesler  wrote: 
> > Hello again, all,
> > 
> > Thank you for your feedback and patience. I am hopeful that
> > I have been able to come up with a solution that will
> > satisfy everyone.
> > 
> > Under a previous design iteration of the desired task idling
> > semantics in KIP-695, we did indeed require the behavior
> > change, but while I was considering all of your feedback, I
> > realized that that requirement is no longer present.
> > 
> > Just a little more detail in case you are curious: I had
> > initially wanted to make the task idling semantics
> > absolutely free of weird timing effects based on how
> > frequently Streams happens to call poll, so I built in a
> > mechanism that would force Streams to get back a _fresh_ lag
> > reponse from poll before proceeding to enforce processing.
> > However, this resulted in a severe performance degradation,
> > so I backed off to use a cache of the lag metadata.
> > 
> > What I realized just now is that under this change, there's
> > no longer a need to return metadata (or change behavior) in
> > Consumer#poll at all. The lag cache in Streams would always
> > be identical to the one inside the Consumer's
> > SubscriptionState. Therefore, I can instead just expose the
> > Consumer's lag in a new API. Here is what I propose:
> > 
> > /**
> >  * Get the consumer's current lag on the partition. Returns
> > an "empty" {@link OptionalLong} if the lag is not known,
> >  * for example if there is no position yet, or if the end
> > offset is not known yet.
> >  *
> >  * 
> >  * This method uses locally cached metadata and never makes
> > a remote call.
> >  *
> >  * @param topicPartition The partition to get the lag for.
> >  *
> >  * @return This {@code Consumer} instance's current lag for
> > the given partition.
> >  *
> >  * @throws IllegalStateException if the {@code
> > topicPartition} is not assigned
> >  **/
> > @Override
> > public OptionalLong currentLag(
> >   TopicPartition topicPartition
> > );
> > 
> > 
> > 
> > With this new API, we have a handy way to find out the lag
> > of the consumer without ever incurring a remote call. There
> > is no unnecessarily low-level config option to confuse
> > users. And there is no change in the behavior of any
> > existinng API to break users' programs.
> > 
> > I have implemented this end-to-end in a preview PR:
> > https://github.com/apache/kafka/pull/10137
> > 
> > If this proposal sounds good to all of you, then I will go
> > ahead and update the KIP.
> > 
> > Sincerely yours,
> > -John
> > 
> > On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote:
> > > here is my two cents. If the behavior eventually gets changed (return on 
> > > response), the config is more suitable as it is easier to be deprecated 
> > > (less changes). For example, we can introduce the config in 2.8 and then 
> > > deprecate it in 2.9. 3.0 removes the config and supports only 
> > > return-on-response.
> > > 
> > > 
> > > 
> > > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> > > > Hey John, I know I'm a bit late to this party but just for the record,
> > > > I don't think it's *totally *unreasonable for a user to take up the 
> > > > "poll
> > > > on max timeout and assume some records will be returned" approach.
> > > > And I also can imagine plenty of manually-assigned consumers
> > > > implemented doing exactly that.
> > > > 
> > > > It's too bad that we're hitting this during the 2.8 release. If we were
> > > > having this discussion in the context of 3.0, then I'd say go for it, 
> > > > since
> > > > it's a breaking change that would just require some modification to the
> > > > applications' poll loop.
> > > > 
> > > > A good analogy here seems to be spurious wakeups -- you generally
> > > > assume that a wa

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-17 Thread Chia-Ping Tsai
That new solution is good to me as it brings fewer changes than either options 
or config.

one minor question - Could we return a composite object rather than 
OptionalLong? For example:

class Metadata {
  long lag;
}

There are a bunch of 'metadata' for a partition and it is possible we want to 
expose more information of a partition in the future. The composite object open 
a room to carry more information without adding more Public APIs to Consumer.

On 2021/02/17 05:21:06, John Roesler  wrote: 
> Hello again, all,
> 
> Thank you for your feedback and patience. I am hopeful that
> I have been able to come up with a solution that will
> satisfy everyone.
> 
> Under a previous design iteration of the desired task idling
> semantics in KIP-695, we did indeed require the behavior
> change, but while I was considering all of your feedback, I
> realized that that requirement is no longer present.
> 
> Just a little more detail in case you are curious: I had
> initially wanted to make the task idling semantics
> absolutely free of weird timing effects based on how
> frequently Streams happens to call poll, so I built in a
> mechanism that would force Streams to get back a _fresh_ lag
> reponse from poll before proceeding to enforce processing.
> However, this resulted in a severe performance degradation,
> so I backed off to use a cache of the lag metadata.
> 
> What I realized just now is that under this change, there's
> no longer a need to return metadata (or change behavior) in
> Consumer#poll at all. The lag cache in Streams would always
> be identical to the one inside the Consumer's
> SubscriptionState. Therefore, I can instead just expose the
> Consumer's lag in a new API. Here is what I propose:
> 
> /**
>  * Get the consumer's current lag on the partition. Returns
> an "empty" {@link OptionalLong} if the lag is not known,
>  * for example if there is no position yet, or if the end
> offset is not known yet.
>  *
>  * 
>  * This method uses locally cached metadata and never makes
> a remote call.
>  *
>  * @param topicPartition The partition to get the lag for.
>  *
>  * @return This {@code Consumer} instance's current lag for
> the given partition.
>  *
>  * @throws IllegalStateException if the {@code
> topicPartition} is not assigned
>  **/
> @Override
> public OptionalLong currentLag(
>   TopicPartition topicPartition
> );
> 
> 
> 
> With this new API, we have a handy way to find out the lag
> of the consumer without ever incurring a remote call. There
> is no unnecessarily low-level config option to confuse
> users. And there is no change in the behavior of any
> existinng API to break users' programs.
> 
> I have implemented this end-to-end in a preview PR:
> https://github.com/apache/kafka/pull/10137
> 
> If this proposal sounds good to all of you, then I will go
> ahead and update the KIP.
> 
> Sincerely yours,
> -John
> 
> On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote:
> > here is my two cents. If the behavior eventually gets changed (return on 
> > response), the config is more suitable as it is easier to be deprecated 
> > (less changes). For example, we can introduce the config in 2.8 and then 
> > deprecate it in 2.9. 3.0 removes the config and supports only 
> > return-on-response.
> > 
> > 
> > 
> > On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> > > Hey John, I know I'm a bit late to this party but just for the record,
> > > I don't think it's *totally *unreasonable for a user to take up the "poll
> > > on max timeout and assume some records will be returned" approach.
> > > And I also can imagine plenty of manually-assigned consumers
> > > implemented doing exactly that.
> > > 
> > > It's too bad that we're hitting this during the 2.8 release. If we were
> > > having this discussion in the context of 3.0, then I'd say go for it, 
> > > since
> > > it's a breaking change that would just require some modification to the
> > > applications' poll loop.
> > > 
> > > A good analogy here seems to be spurious wakeups -- you generally
> > > assume that a waiting thread has woken up due to a notify event in
> > > another thread, but the docs always make it very clear up front that
> > > this can happen "spuriously" and therefore you need to recheck whatever
> > > condition you were waiting on before assuming the thread should proceed.
> > > 
> > > Since we *didn't* document this possibility up front in the case of 
> > > poll(),
> > > it
> > > seems unfair to suddenly change the behavior in a supposedly non-breaking
> > > release. Imagine how many programs would break if spurious wakeups
> > > were suddenly introduced in a release, rather than warned about from
> > > the get-go (not a perfect analogy, far more programs rely on wait/notify
> > > than on poll() returning records, but I think the point still stands.
> > > 
> > > For the record, I also agree with Ismael that a config doesn't feel ideal.
> > > There are already enough configs to present a steep learning curve, so
> > 

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-16 Thread John Roesler
Hello again, all,

Thank you for your feedback and patience. I am hopeful that
I have been able to come up with a solution that will
satisfy everyone.

Under a previous design iteration of the desired task idling
semantics in KIP-695, we did indeed require the behavior
change, but while I was considering all of your feedback, I
realized that that requirement is no longer present.

Just a little more detail in case you are curious: I had
initially wanted to make the task idling semantics
absolutely free of weird timing effects based on how
frequently Streams happens to call poll, so I built in a
mechanism that would force Streams to get back a _fresh_ lag
reponse from poll before proceeding to enforce processing.
However, this resulted in a severe performance degradation,
so I backed off to use a cache of the lag metadata.

What I realized just now is that under this change, there's
no longer a need to return metadata (or change behavior) in
Consumer#poll at all. The lag cache in Streams would always
be identical to the one inside the Consumer's
SubscriptionState. Therefore, I can instead just expose the
Consumer's lag in a new API. Here is what I propose:

/**
 * Get the consumer's current lag on the partition. Returns
an "empty" {@link OptionalLong} if the lag is not known,
 * for example if there is no position yet, or if the end
offset is not known yet.
 *
 * 
 * This method uses locally cached metadata and never makes
a remote call.
 *
 * @param topicPartition The partition to get the lag for.
 *
 * @return This {@code Consumer} instance's current lag for
the given partition.
 *
 * @throws IllegalStateException if the {@code
topicPartition} is not assigned
 **/
@Override
public OptionalLong currentLag(
  TopicPartition topicPartition
);



With this new API, we have a handy way to find out the lag
of the consumer without ever incurring a remote call. There
is no unnecessarily low-level config option to confuse
users. And there is no change in the behavior of any
existinng API to break users' programs.

I have implemented this end-to-end in a preview PR:
https://github.com/apache/kafka/pull/10137

If this proposal sounds good to all of you, then I will go
ahead and update the KIP.

Sincerely yours,
-John

On Thu, 2021-02-11 at 12:16 +, Chia-Ping Tsai wrote:
> here is my two cents. If the behavior eventually gets changed (return on 
> response), the config is more suitable as it is easier to be deprecated (less 
> changes). For example, we can introduce the config in 2.8 and then deprecate 
> it in 2.9. 3.0 removes the config and supports only return-on-response.
> 
> 
> 
> On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> > Hey John, I know I'm a bit late to this party but just for the record,
> > I don't think it's *totally *unreasonable for a user to take up the "poll
> > on max timeout and assume some records will be returned" approach.
> > And I also can imagine plenty of manually-assigned consumers
> > implemented doing exactly that.
> > 
> > It's too bad that we're hitting this during the 2.8 release. If we were
> > having this discussion in the context of 3.0, then I'd say go for it, since
> > it's a breaking change that would just require some modification to the
> > applications' poll loop.
> > 
> > A good analogy here seems to be spurious wakeups -- you generally
> > assume that a waiting thread has woken up due to a notify event in
> > another thread, but the docs always make it very clear up front that
> > this can happen "spuriously" and therefore you need to recheck whatever
> > condition you were waiting on before assuming the thread should proceed.
> > 
> > Since we *didn't* document this possibility up front in the case of poll(),
> > it
> > seems unfair to suddenly change the behavior in a supposedly non-breaking
> > release. Imagine how many programs would break if spurious wakeups
> > were suddenly introduced in a release, rather than warned about from
> > the get-go (not a perfect analogy, far more programs rely on wait/notify
> > than on poll() returning records, but I think the point still stands.
> > 
> > For the record, I also agree with Ismael that a config doesn't feel ideal.
> > There are already enough configs to present a steep learning curve, so
> > I would avoid adding one more wherever possible. And it does indeed
> > seem possible to avoid here, since it's really just a boolean flag (rather
> > than a semi-unbounded space, eg max.poll.interval.ms, or a constant
> > value, eg group.id, where a config does feel appropriate).
> > 
> > Given all that, I would personally advocate for the pollOptions overload.
> > The obvious advantages here are:
> > 1) it's more future-proof, in that we can avoid having a similar discussion
> > if/when we want to consider other semantics changes to poll which some
> > users may want while others would not
> > 2) it leaves the door open to using poll with either semantics in a single
> > consumer. I doubt that's going to be very common 

Re: Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-11 Thread Chia-Ping Tsai
here is my two cents. If the behavior eventually gets changed (return on 
response), the config is more suitable as it is easier to be deprecated (less 
changes). For example, we can introduce the config in 2.8 and then deprecate it 
in 2.9. 3.0 removes the config and supports only return-on-response.



On 2021/02/10 19:59:39 Sophie Blee-Goldman wrote:
> Hey John, I know I'm a bit late to this party but just for the record,
> I don't think it's *totally *unreasonable for a user to take up the "poll
> on max timeout and assume some records will be returned" approach.
> And I also can imagine plenty of manually-assigned consumers
> implemented doing exactly that.
> 
> It's too bad that we're hitting this during the 2.8 release. If we were
> having this discussion in the context of 3.0, then I'd say go for it, since
> it's a breaking change that would just require some modification to the
> applications' poll loop.
> 
> A good analogy here seems to be spurious wakeups -- you generally
> assume that a waiting thread has woken up due to a notify event in
> another thread, but the docs always make it very clear up front that
> this can happen "spuriously" and therefore you need to recheck whatever
> condition you were waiting on before assuming the thread should proceed.
> 
> Since we *didn't* document this possibility up front in the case of poll(),
> it
> seems unfair to suddenly change the behavior in a supposedly non-breaking
> release. Imagine how many programs would break if spurious wakeups
> were suddenly introduced in a release, rather than warned about from
> the get-go (not a perfect analogy, far more programs rely on wait/notify
> than on poll() returning records, but I think the point still stands.
> 
> For the record, I also agree with Ismael that a config doesn't feel ideal.
> There are already enough configs to present a steep learning curve, so
> I would avoid adding one more wherever possible. And it does indeed
> seem possible to avoid here, since it's really just a boolean flag (rather
> than a semi-unbounded space, eg max.poll.interval.ms, or a constant
> value, eg group.id, where a config does feel appropriate).
> 
> Given all that, I would personally advocate for the pollOptions overload.
> The obvious advantages here are:
> 1) it's more future-proof, in that we can avoid having a similar discussion
> if/when we want to consider other semantics changes to poll which some
> users may want while others would not
> 2) it leaves the door open to using poll with either semantics in a single
> consumer. I doubt that's going to be very common in terms of the specific
> option we're discussing here, but it may be more useful for other options
> we may add in the future
> 
> Just my 2 cents. But if the pollOptions proposal would really add so much
> additional work that it would cause the 2.8 release to be significantly
> delayed,
> then that's worth taking into account as well.
> 
> On Wed, Feb 10, 2021 at 9:35 AM John Roesler  wrote:
> 
> > Hello again, all.
> >
> > I have submitted the PR:
> > https://github.com/apache/kafka/pull/10096
> >
> > Ismael chimed in on the PR review to indicate that the
> > config approach may not be desirable.
> >
> > How strongly do we feel that the behavior change is
> > unacceptable? It seems like most of the people involved felt
> > the behavior change is ok (although the docs were wrong).
> >
> > The arguments against the behavior change were plausible,
> > but hypothetical.
> >
> > Can everyone take a look at the PR and weigh in on whether
> > the complexity of an extra config option is really worth it
> > in this case?
> >
> > I have to confess I'm currently leaning more toward dropping
> > the config and going back to the behavior change, while
> > correcting the docs and the system test.
> >
> > While we are wavering on this point, the system tests
> > continue to fail, and the 2.8.0 release is blocked. We
> > should aim to make a call today.
> >
> > Thanks all,
> > -John
> >
> > On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote:
> > > Thanks everyone for chiming in here! I'd also prefer the config approach
> > if
> > > compared with API changes.
> > >
> > > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck  wrote:
> > >
> > > > I meant to chime in earlier.
> > > >
> > > > I also like the `PollOptions` idea, but I have to agree that the config
> > > > option would be the least disruptive approach.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler 
> > wrote:
> > > >
> > > > > Thanks, all!
> > > > >
> > > > > It seems that the config I proposed is a solution that
> > > > > everyone can be happy with, so I will go ahead with a PR to
> > > > > fix that.
> > > > >
> > > > > I'll update the KIP after a round of PR reviews, in case
> > > > > there are new concerns that arise.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > > > > > Thanks for 

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-10 Thread Sophie Blee-Goldman
Hey John, I know I'm a bit late to this party but just for the record,
I don't think it's *totally *unreasonable for a user to take up the "poll
on max timeout and assume some records will be returned" approach.
And I also can imagine plenty of manually-assigned consumers
implemented doing exactly that.

It's too bad that we're hitting this during the 2.8 release. If we were
having this discussion in the context of 3.0, then I'd say go for it, since
it's a breaking change that would just require some modification to the
applications' poll loop.

A good analogy here seems to be spurious wakeups -- you generally
assume that a waiting thread has woken up due to a notify event in
another thread, but the docs always make it very clear up front that
this can happen "spuriously" and therefore you need to recheck whatever
condition you were waiting on before assuming the thread should proceed.

Since we *didn't* document this possibility up front in the case of poll(),
it
seems unfair to suddenly change the behavior in a supposedly non-breaking
release. Imagine how many programs would break if spurious wakeups
were suddenly introduced in a release, rather than warned about from
the get-go (not a perfect analogy, far more programs rely on wait/notify
than on poll() returning records, but I think the point still stands.

For the record, I also agree with Ismael that a config doesn't feel ideal.
There are already enough configs to present a steep learning curve, so
I would avoid adding one more wherever possible. And it does indeed
seem possible to avoid here, since it's really just a boolean flag (rather
than a semi-unbounded space, eg max.poll.interval.ms, or a constant
value, eg group.id, where a config does feel appropriate).

Given all that, I would personally advocate for the pollOptions overload.
The obvious advantages here are:
1) it's more future-proof, in that we can avoid having a similar discussion
if/when we want to consider other semantics changes to poll which some
users may want while others would not
2) it leaves the door open to using poll with either semantics in a single
consumer. I doubt that's going to be very common in terms of the specific
option we're discussing here, but it may be more useful for other options
we may add in the future

Just my 2 cents. But if the pollOptions proposal would really add so much
additional work that it would cause the 2.8 release to be significantly
delayed,
then that's worth taking into account as well.

On Wed, Feb 10, 2021 at 9:35 AM John Roesler  wrote:

> Hello again, all.
>
> I have submitted the PR:
> https://github.com/apache/kafka/pull/10096
>
> Ismael chimed in on the PR review to indicate that the
> config approach may not be desirable.
>
> How strongly do we feel that the behavior change is
> unacceptable? It seems like most of the people involved felt
> the behavior change is ok (although the docs were wrong).
>
> The arguments against the behavior change were plausible,
> but hypothetical.
>
> Can everyone take a look at the PR and weigh in on whether
> the complexity of an extra config option is really worth it
> in this case?
>
> I have to confess I'm currently leaning more toward dropping
> the config and going back to the behavior change, while
> correcting the docs and the system test.
>
> While we are wavering on this point, the system tests
> continue to fail, and the 2.8.0 release is blocked. We
> should aim to make a call today.
>
> Thanks all,
> -John
>
> On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote:
> > Thanks everyone for chiming in here! I'd also prefer the config approach
> if
> > compared with API changes.
> >
> > On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck  wrote:
> >
> > > I meant to chime in earlier.
> > >
> > > I also like the `PollOptions` idea, but I have to agree that the config
> > > option would be the least disruptive approach.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Fri, Feb 5, 2021 at 6:12 PM John Roesler 
> wrote:
> > >
> > > > Thanks, all!
> > > >
> > > > It seems that the config I proposed is a solution that
> > > > everyone can be happy with, so I will go ahead with a PR to
> > > > fix that.
> > > >
> > > > I'll update the KIP after a round of PR reviews, in case
> > > > there are new concerns that arise.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > > > > Thanks for providing more details.
> > > > >
> > > > > Adding a config might be the way a least resistance... I am fine
> with
> > > > that.
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > > > > vvv
> > > > > > > long_poll.mode: return_on_records|return_on_response
> > > > > >
> > > > > > This idea LGTM. It not only makes minimum changes to current
> behavior
> > > > but also works for KIP-695.
> > > > > >
> > > > > > On 2021/02/04 16:07:11, John Roesler 
> wrote:
> > > > > > > Hi Matthias, Chia-Ping, and Tom,
> > > 

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-10 Thread John Roesler
Hello again, all.

I have submitted the PR:
https://github.com/apache/kafka/pull/10096

Ismael chimed in on the PR review to indicate that the
config approach may not be desirable.

How strongly do we feel that the behavior change is
unacceptable? It seems like most of the people involved felt
the behavior change is ok (although the docs were wrong).

The arguments against the behavior change were plausible,
but hypothetical.

Can everyone take a look at the PR and weigh in on whether
the complexity of an extra config option is really worth it
in this case?

I have to confess I'm currently leaning more toward dropping
the config and going back to the behavior change, while
correcting the docs and the system test.

While we are wavering on this point, the system tests
continue to fail, and the 2.8.0 release is blocked. We
should aim to make a call today.

Thanks all,
-John

On Fri, 2021-02-05 at 15:31 -0800, Guozhang Wang wrote:
> Thanks everyone for chiming in here! I'd also prefer the config approach if
> compared with API changes.
> 
> On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck  wrote:
> 
> > I meant to chime in earlier.
> > 
> > I also like the `PollOptions` idea, but I have to agree that the config
> > option would be the least disruptive approach.
> > 
> > Thanks,
> > Bill
> > 
> > On Fri, Feb 5, 2021 at 6:12 PM John Roesler  wrote:
> > 
> > > Thanks, all!
> > > 
> > > It seems that the config I proposed is a solution that
> > > everyone can be happy with, so I will go ahead with a PR to
> > > fix that.
> > > 
> > > I'll update the KIP after a round of PR reviews, in case
> > > there are new concerns that arise.
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > > > Thanks for providing more details.
> > > > 
> > > > Adding a config might be the way a least resistance... I am fine with
> > > that.
> > > > 
> > > > -Matthias
> > > > 
> > > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > > > vvv
> > > > > > long_poll.mode: return_on_records|return_on_response
> > > > > 
> > > > > This idea LGTM. It not only makes minimum changes to current behavior
> > > but also works for KIP-695.
> > > > > 
> > > > > On 2021/02/04 16:07:11, John Roesler  wrote:
> > > > > > Hi Matthias, Chia-Ping, and Tom,
> > > > > > 
> > > > > > Thanks for the thoughtful replies!
> > > > > > 
> > > > > > Re: poll(~forever~) to block indefinitely on records:
> > > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't
> > > > > > personally recommend for anyone to write code that blocks
> > > > > > forever on I/O, I do agree this is something that "real
> > > > > > people" may want to do.
> > > > > > 
> > > > > > Just a note for the record, this approach should only be
> > > > > > used in conjunction with a manual assignment. If people are
> > > > > > using a group subscription, they're setting themselves up to
> > > > > > get kicked out of the group when there is low volume of
> > > > > > updates on the topic. And then, when they get kicked out,
> > > > > > they will never know it because they're just going to be
> > > > > > blocked in `poll()` the whole time.
> > > > > > 
> > > > > > However, if you don't participate in a group and just:
> > > > > > 1 assign(partitions)
> > > > > > 2 poll(forever),
> > > > > > you should indeed expect to return from poll only when you
> > > > > > have records.
> > > > > > 
> > > > > > This possibility is the turning point for me. I'd like to
> > > > > > alter my proposal to an opt-in config, detailed below.
> > > > > > 
> > > > > > Re: Javadoc:
> > > > > > Thanks for pointing that out. It does seem like, if we do
> > > > > > decide to change behavior, we should adjust the Javadoc to
> > > > > > say so. That was an oversight on my part, and I daresay that
> > > > > > if I had done that initially, it would have saved Rajini
> > > > > > from having to dig into the code to pinpoint the cause of
> > > > > > those test failures.
> > > > > > 
> > > > > > Re: PollOptions:
> > > > > > I actually like this option quite a bit. It seems like this
> > > > > > would be warranted if we expect someone to want to use the
> > > > > > same Consumer instance in both "return on metadata or
> > > > > > records" and "return on only records" mode. Otherwise, we
> > > > > > might as well introduce a new config.
> > > > > > 
> > > > > > It also seems like the behavior I proposed in this KIP is
> > > > > > somewhat "advanced", so I could certainly see leaving it off
> > > > > > by default and offering an opt-in config.
> > > > > > 
> > > > > > How does everyone feel about this opt-in config:
> > > > > > 
> > > > > > vvv
> > > > > > long_poll.mode: return_on_records|return_on_response
> > > > > > 
> > > > > > doc:
> > > > > > * return_on_records: (default) a call to
> > > > > > Consumer#poll(timeout) will block up to the timeout and
> > > > > > return early if records are received.
> > > > > > * return_on_response: a call to Cons

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-05 Thread Guozhang Wang
Thanks everyone for chiming in here! I'd also prefer the config approach if
compared with API changes.

On Fri, Feb 5, 2021 at 3:18 PM Bill Bejeck  wrote:

> I meant to chime in earlier.
>
> I also like the `PollOptions` idea, but I have to agree that the config
> option would be the least disruptive approach.
>
> Thanks,
> Bill
>
> On Fri, Feb 5, 2021 at 6:12 PM John Roesler  wrote:
>
> > Thanks, all!
> >
> > It seems that the config I proposed is a solution that
> > everyone can be happy with, so I will go ahead with a PR to
> > fix that.
> >
> > I'll update the KIP after a round of PR reviews, in case
> > there are new concerns that arise.
> >
> > Thanks,
> > -John
> >
> > On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > > Thanks for providing more details.
> > >
> > > Adding a config might be the way a least resistance... I am fine with
> > that.
> > >
> > > -Matthias
> > >
> > > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > > vvv
> > > > > long_poll.mode: return_on_records|return_on_response
> > > >
> > > > This idea LGTM. It not only makes minimum changes to current behavior
> > but also works for KIP-695.
> > > >
> > > > On 2021/02/04 16:07:11, John Roesler  wrote:
> > > > > Hi Matthias, Chia-Ping, and Tom,
> > > > >
> > > > > Thanks for the thoughtful replies!
> > > > >
> > > > > Re: poll(~forever~) to block indefinitely on records:
> > > > > Thanks for your dilligence, Chia-Ping. While I wouldn't
> > > > > personally recommend for anyone to write code that blocks
> > > > > forever on I/O, I do agree this is something that "real
> > > > > people" may want to do.
> > > > >
> > > > > Just a note for the record, this approach should only be
> > > > > used in conjunction with a manual assignment. If people are
> > > > > using a group subscription, they're setting themselves up to
> > > > > get kicked out of the group when there is low volume of
> > > > > updates on the topic. And then, when they get kicked out,
> > > > > they will never know it because they're just going to be
> > > > > blocked in `poll()` the whole time.
> > > > >
> > > > > However, if you don't participate in a group and just:
> > > > > 1 assign(partitions)
> > > > > 2 poll(forever),
> > > > > you should indeed expect to return from poll only when you
> > > > > have records.
> > > > >
> > > > > This possibility is the turning point for me. I'd like to
> > > > > alter my proposal to an opt-in config, detailed below.
> > > > >
> > > > > Re: Javadoc:
> > > > > Thanks for pointing that out. It does seem like, if we do
> > > > > decide to change behavior, we should adjust the Javadoc to
> > > > > say so. That was an oversight on my part, and I daresay that
> > > > > if I had done that initially, it would have saved Rajini
> > > > > from having to dig into the code to pinpoint the cause of
> > > > > those test failures.
> > > > >
> > > > > Re: PollOptions:
> > > > > I actually like this option quite a bit. It seems like this
> > > > > would be warranted if we expect someone to want to use the
> > > > > same Consumer instance in both "return on metadata or
> > > > > records" and "return on only records" mode. Otherwise, we
> > > > > might as well introduce a new config.
> > > > >
> > > > > It also seems like the behavior I proposed in this KIP is
> > > > > somewhat "advanced", so I could certainly see leaving it off
> > > > > by default and offering an opt-in config.
> > > > >
> > > > > How does everyone feel about this opt-in config:
> > > > >
> > > > > vvv
> > > > > long_poll.mode: return_on_records|return_on_response
> > > > >
> > > > > doc:
> > > > > * return_on_records: (default) a call to
> > > > > Consumer#poll(timeout) will block up to the timeout and
> > > > > return early if records are received.
> > > > > * return_on_response: a call to Consumer#poll(timeout) will
> > > > > block up to the timeout and return early if any fetch
> > > > > response is received. Use this option to get updates from
> > > > > Consumer#metadata() even if Consumer#records() is empty.
> > > > > ^^^
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote:
> > > > > > Hi,
> > > > > >
> > > > > > The Javadoc for KafkaConsumer#poll() includes the following:
> > > > > >
> > > > > > * This method returns immediately if there are records available.
> > *Otherwise,
> > > > > > > it will await the passed timeout.*
> > > > > > > * If the timeout expires, an empty record set will be returned.
> > Note that
> > > > > > > this method may block beyond the
> > > > > > > * timeout in order to execute custom {@link
> > ConsumerRebalanceListener}
> > > > > > > callbacks.
> > > > > > >
> > > > > >
> > > > > > In other words: If the method returns before the timeout there
> > must be
> > > > > > records in the method result. After the timeout has passed there
> > may be no
> > > > > > records. It might block for longer than the timeout. So I think
> > r

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-05 Thread Bill Bejeck
I meant to chime in earlier.

I also like the `PollOptions` idea, but I have to agree that the config
option would be the least disruptive approach.

Thanks,
Bill

On Fri, Feb 5, 2021 at 6:12 PM John Roesler  wrote:

> Thanks, all!
>
> It seems that the config I proposed is a solution that
> everyone can be happy with, so I will go ahead with a PR to
> fix that.
>
> I'll update the KIP after a round of PR reviews, in case
> there are new concerns that arise.
>
> Thanks,
> -John
>
> On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> > Thanks for providing more details.
> >
> > Adding a config might be the way a least resistance... I am fine with
> that.
> >
> > -Matthias
> >
> > On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > > vvv
> > > > long_poll.mode: return_on_records|return_on_response
> > >
> > > This idea LGTM. It not only makes minimum changes to current behavior
> but also works for KIP-695.
> > >
> > > On 2021/02/04 16:07:11, John Roesler  wrote:
> > > > Hi Matthias, Chia-Ping, and Tom,
> > > >
> > > > Thanks for the thoughtful replies!
> > > >
> > > > Re: poll(~forever~) to block indefinitely on records:
> > > > Thanks for your dilligence, Chia-Ping. While I wouldn't
> > > > personally recommend for anyone to write code that blocks
> > > > forever on I/O, I do agree this is something that "real
> > > > people" may want to do.
> > > >
> > > > Just a note for the record, this approach should only be
> > > > used in conjunction with a manual assignment. If people are
> > > > using a group subscription, they're setting themselves up to
> > > > get kicked out of the group when there is low volume of
> > > > updates on the topic. And then, when they get kicked out,
> > > > they will never know it because they're just going to be
> > > > blocked in `poll()` the whole time.
> > > >
> > > > However, if you don't participate in a group and just:
> > > > 1 assign(partitions)
> > > > 2 poll(forever),
> > > > you should indeed expect to return from poll only when you
> > > > have records.
> > > >
> > > > This possibility is the turning point for me. I'd like to
> > > > alter my proposal to an opt-in config, detailed below.
> > > >
> > > > Re: Javadoc:
> > > > Thanks for pointing that out. It does seem like, if we do
> > > > decide to change behavior, we should adjust the Javadoc to
> > > > say so. That was an oversight on my part, and I daresay that
> > > > if I had done that initially, it would have saved Rajini
> > > > from having to dig into the code to pinpoint the cause of
> > > > those test failures.
> > > >
> > > > Re: PollOptions:
> > > > I actually like this option quite a bit. It seems like this
> > > > would be warranted if we expect someone to want to use the
> > > > same Consumer instance in both "return on metadata or
> > > > records" and "return on only records" mode. Otherwise, we
> > > > might as well introduce a new config.
> > > >
> > > > It also seems like the behavior I proposed in this KIP is
> > > > somewhat "advanced", so I could certainly see leaving it off
> > > > by default and offering an opt-in config.
> > > >
> > > > How does everyone feel about this opt-in config:
> > > >
> > > > vvv
> > > > long_poll.mode: return_on_records|return_on_response
> > > >
> > > > doc:
> > > > * return_on_records: (default) a call to
> > > > Consumer#poll(timeout) will block up to the timeout and
> > > > return early if records are received.
> > > > * return_on_response: a call to Consumer#poll(timeout) will
> > > > block up to the timeout and return early if any fetch
> > > > response is received. Use this option to get updates from
> > > > Consumer#metadata() even if Consumer#records() is empty.
> > > > ^^^
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote:
> > > > > Hi,
> > > > >
> > > > > The Javadoc for KafkaConsumer#poll() includes the following:
> > > > >
> > > > > * This method returns immediately if there are records available.
> *Otherwise,
> > > > > > it will await the passed timeout.*
> > > > > > * If the timeout expires, an empty record set will be returned.
> Note that
> > > > > > this method may block beyond the
> > > > > > * timeout in order to execute custom {@link
> ConsumerRebalanceListener}
> > > > > > callbacks.
> > > > > >
> > > > >
> > > > > In other words: If the method returns before the timeout there
> must be
> > > > > records in the method result. After the timeout has passed there
> may be no
> > > > > records. It might block for longer than the timeout. So I think
> returning
> > > > > with empty records before at least the given timeout has passed
> breaks that
> > > > > contract.
> > > > >
> > > > > A not-much-prettier alternative to adding a new
> > > > > pollForRecordsOrMetadata(Duration) method could be overloading
> poll() to
> > > > > take an additional parameter which controlled whether an early
> return with
> > > > > empty records was allowed. Or a `

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-05 Thread John Roesler
Thanks, all!

It seems that the config I proposed is a solution that
everyone can be happy with, so I will go ahead with a PR to
fix that.

I'll update the KIP after a round of PR reviews, in case
there are new concerns that arise.

Thanks,
-John

On Fri, 2021-02-05 at 15:07 -0800, Matthias J. Sax wrote:
> Thanks for providing more details.
> 
> Adding a config might be the way a least resistance... I am fine with that.
> 
> -Matthias
> 
> On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
> > > vvv
> > > long_poll.mode: return_on_records|return_on_response
> > 
> > This idea LGTM. It not only makes minimum changes to current behavior but 
> > also works for KIP-695. 
> > 
> > On 2021/02/04 16:07:11, John Roesler  wrote: 
> > > Hi Matthias, Chia-Ping, and Tom,
> > > 
> > > Thanks for the thoughtful replies!
> > > 
> > > Re: poll(~forever~) to block indefinitely on records:
> > > Thanks for your dilligence, Chia-Ping. While I wouldn't
> > > personally recommend for anyone to write code that blocks
> > > forever on I/O, I do agree this is something that "real
> > > people" may want to do.
> > > 
> > > Just a note for the record, this approach should only be
> > > used in conjunction with a manual assignment. If people are
> > > using a group subscription, they're setting themselves up to
> > > get kicked out of the group when there is low volume of
> > > updates on the topic. And then, when they get kicked out,
> > > they will never know it because they're just going to be
> > > blocked in `poll()` the whole time.
> > > 
> > > However, if you don't participate in a group and just:
> > > 1 assign(partitions)
> > > 2 poll(forever),
> > > you should indeed expect to return from poll only when you
> > > have records.
> > > 
> > > This possibility is the turning point for me. I'd like to
> > > alter my proposal to an opt-in config, detailed below.
> > > 
> > > Re: Javadoc:
> > > Thanks for pointing that out. It does seem like, if we do
> > > decide to change behavior, we should adjust the Javadoc to
> > > say so. That was an oversight on my part, and I daresay that
> > > if I had done that initially, it would have saved Rajini
> > > from having to dig into the code to pinpoint the cause of
> > > those test failures.
> > > 
> > > Re: PollOptions:
> > > I actually like this option quite a bit. It seems like this
> > > would be warranted if we expect someone to want to use the
> > > same Consumer instance in both "return on metadata or
> > > records" and "return on only records" mode. Otherwise, we
> > > might as well introduce a new config.
> > > 
> > > It also seems like the behavior I proposed in this KIP is
> > > somewhat "advanced", so I could certainly see leaving it off
> > > by default and offering an opt-in config.
> > > 
> > > How does everyone feel about this opt-in config:
> > > 
> > > vvv
> > > long_poll.mode: return_on_records|return_on_response
> > > 
> > > doc:
> > > * return_on_records: (default) a call to
> > > Consumer#poll(timeout) will block up to the timeout and
> > > return early if records are received.
> > > * return_on_response: a call to Consumer#poll(timeout) will
> > > block up to the timeout and return early if any fetch
> > > response is received. Use this option to get updates from
> > > Consumer#metadata() even if Consumer#records() is empty.
> > > ^^^
> > > 
> > > Thanks,
> > > John
> > > 
> > > On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote:
> > > > Hi,
> > > > 
> > > > The Javadoc for KafkaConsumer#poll() includes the following:
> > > > 
> > > > * This method returns immediately if there are records available. 
> > > > *Otherwise,
> > > > > it will await the passed timeout.*
> > > > > * If the timeout expires, an empty record set will be returned. Note 
> > > > > that
> > > > > this method may block beyond the
> > > > > * timeout in order to execute custom {@link ConsumerRebalanceListener}
> > > > > callbacks.
> > > > > 
> > > > 
> > > > In other words: If the method returns before the timeout there must be
> > > > records in the method result. After the timeout has passed there may be 
> > > > no
> > > > records. It might block for longer than the timeout. So I think 
> > > > returning
> > > > with empty records before at least the given timeout has passed breaks 
> > > > that
> > > > contract.
> > > > 
> > > > A not-much-prettier alternative to adding a new
> > > > pollForRecordsOrMetadata(Duration) method could be overloading poll() to
> > > > take an additional parameter which controlled whether an early return 
> > > > with
> > > > empty records was allowed. Or a `poll(PollOptions)`. In the long run it
> > > > could be a mistake to include in the method name exactly what might 
> > > > cause
> > > > an early empty return.
> > > > 
> > > > Kind regards,
> > > > 
> > > > Tom
> > > > 
> > > > 
> > > > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai  
> > > > wrote:
> > > > 
> > > > > Thanks for your sharing Matthias. I agree that is inde

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-05 Thread Matthias J. Sax
Thanks for providing more details.

Adding a config might be the way a least resistance... I am fine with that.

-Matthias

On 2/4/21 9:42 AM, Chia-Ping Tsai wrote:
>> vvv
>> long_poll.mode: return_on_records|return_on_response
> 
> This idea LGTM. It not only makes minimum changes to current behavior but 
> also works for KIP-695. 
> 
> On 2021/02/04 16:07:11, John Roesler  wrote: 
>> Hi Matthias, Chia-Ping, and Tom,
>>
>> Thanks for the thoughtful replies!
>>
>> Re: poll(~forever~) to block indefinitely on records:
>> Thanks for your dilligence, Chia-Ping. While I wouldn't
>> personally recommend for anyone to write code that blocks
>> forever on I/O, I do agree this is something that "real
>> people" may want to do.
>>
>> Just a note for the record, this approach should only be
>> used in conjunction with a manual assignment. If people are
>> using a group subscription, they're setting themselves up to
>> get kicked out of the group when there is low volume of
>> updates on the topic. And then, when they get kicked out,
>> they will never know it because they're just going to be
>> blocked in `poll()` the whole time.
>>
>> However, if you don't participate in a group and just:
>> 1 assign(partitions)
>> 2 poll(forever),
>> you should indeed expect to return from poll only when you
>> have records.
>>
>> This possibility is the turning point for me. I'd like to
>> alter my proposal to an opt-in config, detailed below.
>>
>> Re: Javadoc:
>> Thanks for pointing that out. It does seem like, if we do
>> decide to change behavior, we should adjust the Javadoc to
>> say so. That was an oversight on my part, and I daresay that
>> if I had done that initially, it would have saved Rajini
>> from having to dig into the code to pinpoint the cause of
>> those test failures.
>>
>> Re: PollOptions:
>> I actually like this option quite a bit. It seems like this
>> would be warranted if we expect someone to want to use the
>> same Consumer instance in both "return on metadata or
>> records" and "return on only records" mode. Otherwise, we
>> might as well introduce a new config.
>>
>> It also seems like the behavior I proposed in this KIP is
>> somewhat "advanced", so I could certainly see leaving it off
>> by default and offering an opt-in config.
>>
>> How does everyone feel about this opt-in config:
>>
>> vvv
>> long_poll.mode: return_on_records|return_on_response
>>
>> doc:
>> * return_on_records: (default) a call to
>> Consumer#poll(timeout) will block up to the timeout and
>> return early if records are received.
>> * return_on_response: a call to Consumer#poll(timeout) will
>> block up to the timeout and return early if any fetch
>> response is received. Use this option to get updates from
>> Consumer#metadata() even if Consumer#records() is empty.
>> ^^^
>>
>> Thanks,
>> John
>>
>> On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote:
>>> Hi,
>>>
>>> The Javadoc for KafkaConsumer#poll() includes the following:
>>>
>>> * This method returns immediately if there are records available. 
>>> *Otherwise,
 it will await the passed timeout.*
 * If the timeout expires, an empty record set will be returned. Note that
 this method may block beyond the
 * timeout in order to execute custom {@link ConsumerRebalanceListener}
 callbacks.

>>>
>>> In other words: If the method returns before the timeout there must be
>>> records in the method result. After the timeout has passed there may be no
>>> records. It might block for longer than the timeout. So I think returning
>>> with empty records before at least the given timeout has passed breaks that
>>> contract.
>>>
>>> A not-much-prettier alternative to adding a new
>>> pollForRecordsOrMetadata(Duration) method could be overloading poll() to
>>> take an additional parameter which controlled whether an early return with
>>> empty records was allowed. Or a `poll(PollOptions)`. In the long run it
>>> could be a mistake to include in the method name exactly what might cause
>>> an early empty return.
>>>
>>> Kind regards,
>>>
>>> Tom
>>>
>>>
>>> On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai  wrote:
>>>
 Thanks for your sharing Matthias. I agree that is indeed an anti-pattern
 to assume poll() returns data or not.

 However, I check all usages of poll() in code base. There is an
 interesting use case - poll(a bigger timeout) - it implies that callers
 want to block poll()(forever) unless there are available data.

 [1]
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
 [2]
 https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232

 Hence, I start to worry client code like aforementioned cases get broken
 due to behavior change :(

 On 2021/02/03 22:59:09, "Matthias J. Sax"  wrote:
> Thanks for your email John.
>
> I agree tha

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-04 Thread Chia-Ping Tsai
> vvv
> long_poll.mode: return_on_records|return_on_response

This idea LGTM. It not only makes minimum changes to current behavior but also 
works for KIP-695. 

On 2021/02/04 16:07:11, John Roesler  wrote: 
> Hi Matthias, Chia-Ping, and Tom,
> 
> Thanks for the thoughtful replies!
> 
> Re: poll(~forever~) to block indefinitely on records:
> Thanks for your dilligence, Chia-Ping. While I wouldn't
> personally recommend for anyone to write code that blocks
> forever on I/O, I do agree this is something that "real
> people" may want to do.
> 
> Just a note for the record, this approach should only be
> used in conjunction with a manual assignment. If people are
> using a group subscription, they're setting themselves up to
> get kicked out of the group when there is low volume of
> updates on the topic. And then, when they get kicked out,
> they will never know it because they're just going to be
> blocked in `poll()` the whole time.
> 
> However, if you don't participate in a group and just:
> 1 assign(partitions)
> 2 poll(forever),
> you should indeed expect to return from poll only when you
> have records.
> 
> This possibility is the turning point for me. I'd like to
> alter my proposal to an opt-in config, detailed below.
> 
> Re: Javadoc:
> Thanks for pointing that out. It does seem like, if we do
> decide to change behavior, we should adjust the Javadoc to
> say so. That was an oversight on my part, and I daresay that
> if I had done that initially, it would have saved Rajini
> from having to dig into the code to pinpoint the cause of
> those test failures.
> 
> Re: PollOptions:
> I actually like this option quite a bit. It seems like this
> would be warranted if we expect someone to want to use the
> same Consumer instance in both "return on metadata or
> records" and "return on only records" mode. Otherwise, we
> might as well introduce a new config.
> 
> It also seems like the behavior I proposed in this KIP is
> somewhat "advanced", so I could certainly see leaving it off
> by default and offering an opt-in config.
> 
> How does everyone feel about this opt-in config:
> 
> vvv
> long_poll.mode: return_on_records|return_on_response
> 
> doc:
> * return_on_records: (default) a call to
> Consumer#poll(timeout) will block up to the timeout and
> return early if records are received.
> * return_on_response: a call to Consumer#poll(timeout) will
> block up to the timeout and return early if any fetch
> response is received. Use this option to get updates from
> Consumer#metadata() even if Consumer#records() is empty.
> ^^^
> 
> Thanks,
> John
> 
> On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote:
> > Hi,
> > 
> > The Javadoc for KafkaConsumer#poll() includes the following:
> > 
> > * This method returns immediately if there are records available. 
> > *Otherwise,
> > > it will await the passed timeout.*
> > > * If the timeout expires, an empty record set will be returned. Note that
> > > this method may block beyond the
> > > * timeout in order to execute custom {@link ConsumerRebalanceListener}
> > > callbacks.
> > > 
> > 
> > In other words: If the method returns before the timeout there must be
> > records in the method result. After the timeout has passed there may be no
> > records. It might block for longer than the timeout. So I think returning
> > with empty records before at least the given timeout has passed breaks that
> > contract.
> > 
> > A not-much-prettier alternative to adding a new
> > pollForRecordsOrMetadata(Duration) method could be overloading poll() to
> > take an additional parameter which controlled whether an early return with
> > empty records was allowed. Or a `poll(PollOptions)`. In the long run it
> > could be a mistake to include in the method name exactly what might cause
> > an early empty return.
> > 
> > Kind regards,
> > 
> > Tom
> > 
> > 
> > On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai  wrote:
> > 
> > > Thanks for your sharing Matthias. I agree that is indeed an anti-pattern
> > > to assume poll() returns data or not.
> > > 
> > > However, I check all usages of poll() in code base. There is an
> > > interesting use case - poll(a bigger timeout) - it implies that callers
> > > want to block poll()(forever) unless there are available data.
> > > 
> > > [1]
> > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
> > > [2]
> > > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
> > > 
> > > Hence, I start to worry client code like aforementioned cases get broken
> > > due to behavior change :(
> > > 
> > > On 2021/02/03 22:59:09, "Matthias J. Sax"  wrote:
> > > > Thanks for your email John.
> > > > 
> > > > I agree that it seems to be an anti-pattern to write code that makes
> > > > assumptions if poll() returns data or not. Thus, we should fix-forward
> > > > the system test from my point of view.
> 

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-04 Thread John Roesler
Hi Matthias, Chia-Ping, and Tom,

Thanks for the thoughtful replies!

Re: poll(~forever~) to block indefinitely on records:
Thanks for your dilligence, Chia-Ping. While I wouldn't
personally recommend for anyone to write code that blocks
forever on I/O, I do agree this is something that "real
people" may want to do.

Just a note for the record, this approach should only be
used in conjunction with a manual assignment. If people are
using a group subscription, they're setting themselves up to
get kicked out of the group when there is low volume of
updates on the topic. And then, when they get kicked out,
they will never know it because they're just going to be
blocked in `poll()` the whole time.

However, if you don't participate in a group and just:
1 assign(partitions)
2 poll(forever),
you should indeed expect to return from poll only when you
have records.

This possibility is the turning point for me. I'd like to
alter my proposal to an opt-in config, detailed below.

Re: Javadoc:
Thanks for pointing that out. It does seem like, if we do
decide to change behavior, we should adjust the Javadoc to
say so. That was an oversight on my part, and I daresay that
if I had done that initially, it would have saved Rajini
from having to dig into the code to pinpoint the cause of
those test failures.

Re: PollOptions:
I actually like this option quite a bit. It seems like this
would be warranted if we expect someone to want to use the
same Consumer instance in both "return on metadata or
records" and "return on only records" mode. Otherwise, we
might as well introduce a new config.

It also seems like the behavior I proposed in this KIP is
somewhat "advanced", so I could certainly see leaving it off
by default and offering an opt-in config.

How does everyone feel about this opt-in config:

vvv
long_poll.mode: return_on_records|return_on_response

doc:
* return_on_records: (default) a call to
Consumer#poll(timeout) will block up to the timeout and
return early if records are received.
* return_on_response: a call to Consumer#poll(timeout) will
block up to the timeout and return early if any fetch
response is received. Use this option to get updates from
Consumer#metadata() even if Consumer#records() is empty.
^^^

Thanks,
John

On Thu, 2021-02-04 at 08:44 +, Tom Bentley wrote:
> Hi,
> 
> The Javadoc for KafkaConsumer#poll() includes the following:
> 
> * This method returns immediately if there are records available. *Otherwise,
> > it will await the passed timeout.*
> > * If the timeout expires, an empty record set will be returned. Note that
> > this method may block beyond the
> > * timeout in order to execute custom {@link ConsumerRebalanceListener}
> > callbacks.
> > 
> 
> In other words: If the method returns before the timeout there must be
> records in the method result. After the timeout has passed there may be no
> records. It might block for longer than the timeout. So I think returning
> with empty records before at least the given timeout has passed breaks that
> contract.
> 
> A not-much-prettier alternative to adding a new
> pollForRecordsOrMetadata(Duration) method could be overloading poll() to
> take an additional parameter which controlled whether an early return with
> empty records was allowed. Or a `poll(PollOptions)`. In the long run it
> could be a mistake to include in the method name exactly what might cause
> an early empty return.
> 
> Kind regards,
> 
> Tom
> 
> 
> On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai  wrote:
> 
> > Thanks for your sharing Matthias. I agree that is indeed an anti-pattern
> > to assume poll() returns data or not.
> > 
> > However, I check all usages of poll() in code base. There is an
> > interesting use case - poll(a bigger timeout) - it implies that callers
> > want to block poll()(forever) unless there are available data.
> > 
> > [1]
> > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
> > [2]
> > https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
> > 
> > Hence, I start to worry client code like aforementioned cases get broken
> > due to behavior change :(
> > 
> > On 2021/02/03 22:59:09, "Matthias J. Sax"  wrote:
> > > Thanks for your email John.
> > > 
> > > I agree that it seems to be an anti-pattern to write code that makes
> > > assumptions if poll() returns data or not. Thus, we should fix-forward
> > > the system test from my point of view.
> > > 
> > > From my understanding, the impact of KIP-695 is that we might return
> > > early from poll() (ie, before the timeout passed) with no data, only if
> > > an empty fetch request comes back and there is no other fetch request
> > > that did return data. Thus, for most cases, poll() should still return
> > > early and provide data. -- Thus, I have no concerns with the slight
> > > behavior change.
> > > 
> > > Would be good to get input from others about 

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-04 Thread Tom Bentley
Hi,

The Javadoc for KafkaConsumer#poll() includes the following:

* This method returns immediately if there are records available. *Otherwise,
> it will await the passed timeout.*
> * If the timeout expires, an empty record set will be returned. Note that
> this method may block beyond the
> * timeout in order to execute custom {@link ConsumerRebalanceListener}
> callbacks.
>

In other words: If the method returns before the timeout there must be
records in the method result. After the timeout has passed there may be no
records. It might block for longer than the timeout. So I think returning
with empty records before at least the given timeout has passed breaks that
contract.

A not-much-prettier alternative to adding a new
pollForRecordsOrMetadata(Duration) method could be overloading poll() to
take an additional parameter which controlled whether an early return with
empty records was allowed. Or a `poll(PollOptions)`. In the long run it
could be a mistake to include in the method name exactly what might cause
an early empty return.

Kind regards,

Tom


On Thu, Feb 4, 2021 at 5:08 AM Chia-Ping Tsai  wrote:

> Thanks for your sharing Matthias. I agree that is indeed an anti-pattern
> to assume poll() returns data or not.
>
> However, I check all usages of poll() in code base. There is an
> interesting use case - poll(a bigger timeout) - it implies that callers
> want to block poll()(forever) unless there are available data.
>
> [1]
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
> [2]
> https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232
>
> Hence, I start to worry client code like aforementioned cases get broken
> due to behavior change :(
>
> On 2021/02/03 22:59:09, "Matthias J. Sax"  wrote:
> > Thanks for your email John.
> >
> > I agree that it seems to be an anti-pattern to write code that makes
> > assumptions if poll() returns data or not. Thus, we should fix-forward
> > the system test from my point of view.
> >
> > From my understanding, the impact of KIP-695 is that we might return
> > early from poll() (ie, before the timeout passed) with no data, only if
> > an empty fetch request comes back and there is no other fetch request
> > that did return data. Thus, for most cases, poll() should still return
> > early and provide data. -- Thus, I have no concerns with the slight
> > behavior change.
> >
> > Would be good to get input from others about this question though.
> >
> >
> > -Matthias
> >
> >
> > On 2/3/21 10:06 AM, John Roesler wrote:
> > > Hello again all,
> > >
> > > I'm resurrecting this thread to discuss an issue that has
> > > come up after merging the code for this KIP.
> > >
> > > The issue is that some of the system tests need to be
> > > updated in the same way that this integration test needed to
> > > be updated:
> > >
> https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
> > >
> > > This issue was reported here:
> > > https://issues.apache.org/jira/browse/KAFKA-12268
> > > and there is some preliminary discussion here:
> > > https://github.com/apache/kafka/pull/10022
> > >
> > > First, let me offer my apologies for failing to catch this
> > > before the merge. I'm sorry that it became Rajini's work to
> > > track down the cause of the failure, when it was my
> > > responsibility to ensure the feature was merged safely.
> > >
> > > To recap the situation:
> > > Consumer#poll(Duration) will now return before the duration
> > > expires even if there are no records returned if there is
> > > some returned metadata.
> > >
> > > This behavior was important for KIP-695. In the situation
> > > where we get no records back for some partition, Streams
> > > needs to have the freshest possible information about
> > > whether  there are no new records on the broker, or whether
> > > there are records on the broker that we still need to fetch.
> > > If that's not clear, the KIP contains the full story.
> > >
> > > It's definitely a behavior change, but our rationale was
> > > that it's an acceptable behavior change. Our big alternative
> > > is to add a _new_ method to Consumer to
> > > pollForRecordsOrMetadata(Duration) or something.
> > >
> > > It seems unreliable to expect the broker to return a
> > > particular record within a particular timeout in general,
> > > which is what these tests are doing. The broker can decide
> > > for several reasons not to return data for a partition, but
> > > return data for another partition instead.
> > >
> > > It seems like the only case where you might reasonably try
> > > to rely on that is in a test, where you first write a record
> > > to a partition, then you assign only that one partition to a
> > > consumer, then you poll on the consumer, expecting it to
> > > return the data you just wrote.
> > >
> > > So the $10 question here is whether we should 

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-03 Thread Chia-Ping Tsai
Thanks for your sharing Matthias. I agree that is indeed an anti-pattern to 
assume poll() returns data or not.

However, I check all usages of poll() in code base. There is an interesting use 
case - poll(a bigger timeout) - it implies that callers want to block 
poll()(forever) unless there are available data.

[1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443
[2] 
https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232

Hence, I start to worry client code like aforementioned cases get broken due to 
behavior change :(

On 2021/02/03 22:59:09, "Matthias J. Sax"  wrote: 
> Thanks for your email John.
> 
> I agree that it seems to be an anti-pattern to write code that makes
> assumptions if poll() returns data or not. Thus, we should fix-forward
> the system test from my point of view.
> 
> From my understanding, the impact of KIP-695 is that we might return
> early from poll() (ie, before the timeout passed) with no data, only if
> an empty fetch request comes back and there is no other fetch request
> that did return data. Thus, for most cases, poll() should still return
> early and provide data. -- Thus, I have no concerns with the slight
> behavior change.
> 
> Would be good to get input from others about this question though.
> 
> 
> -Matthias
> 
> 
> On 2/3/21 10:06 AM, John Roesler wrote:
> > Hello again all,
> > 
> > I'm resurrecting this thread to discuss an issue that has
> > come up after merging the code for this KIP.
> > 
> > The issue is that some of the system tests need to be
> > updated in the same way that this integration test needed to
> > be updated:
> > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
> > 
> > This issue was reported here:
> > https://issues.apache.org/jira/browse/KAFKA-12268
> > and there is some preliminary discussion here:
> > https://github.com/apache/kafka/pull/10022
> > 
> > First, let me offer my apologies for failing to catch this
> > before the merge. I'm sorry that it became Rajini's work to
> > track down the cause of the failure, when it was my
> > responsibility to ensure the feature was merged safely.
> > 
> > To recap the situation:
> > Consumer#poll(Duration) will now return before the duration
> > expires even if there are no records returned if there is
> > some returned metadata.
> > 
> > This behavior was important for KIP-695. In the situation
> > where we get no records back for some partition, Streams
> > needs to have the freshest possible information about
> > whether  there are no new records on the broker, or whether
> > there are records on the broker that we still need to fetch.
> > If that's not clear, the KIP contains the full story.
> > 
> > It's definitely a behavior change, but our rationale was
> > that it's an acceptable behavior change. Our big alternative
> > is to add a _new_ method to Consumer to
> > pollForRecordsOrMetadata(Duration) or something.
> > 
> > It seems unreliable to expect the broker to return a
> > particular record within a particular timeout in general,
> > which is what these tests are doing. The broker can decide
> > for several reasons not to return data for a partition, but
> > return data for another partition instead.
> > 
> > It seems like the only case where you might reasonably try
> > to rely on that is in a test, where you first write a record
> > to a partition, then you assign only that one partition to a
> > consumer, then you poll on the consumer, expecting it to
> > return the data you just wrote.
> > 
> > So the $10 question here is whether we should support this
> > apparently artificial (testing-only) use case to the point
> > where it's worth adding a whole new method to the Consumer
> > interface.
> > 
> > Thanks all,
> > John
> > 
> > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote:
> >> Thanks Jason,
> >>
> >> We would only return the metadata for the latest fetches.
> >> So, if someone wanted to use this to lazily maintain a
> >> client-side metadata map for all partitions, they'd have to
> >> store it separately and merge in new updates as they arrive.
> >>
> >> This way:
> >> 1. We don't need to increase the complexity of the client by
> >> storing that metadata
> >> 2. Users will be able to treat all returned metadata as
> >> "fresh" without having to reason about the timestamps.
> >> 3. All parts of the returned ConsumerRecords object have the
> >> same lifecycle: all the data and metadata are the results of
> >> the most recent round of fetch responses that had not been
> >> previously polled.
> >>
> >> Does that seem sensible to you? I'll update the KIP to
> >> clarify this.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote:
> >>> Hi John,
> >>>
> >>> Just one question. It wasn't very clear to me exactly when the metadata
> >>> would be re

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-03 Thread Matthias J. Sax
Thanks for your email John.

I agree that it seems to be an anti-pattern to write code that makes
assumptions if poll() returns data or not. Thus, we should fix-forward
the system test from my point of view.

>From my understanding, the impact of KIP-695 is that we might return
early from poll() (ie, before the timeout passed) with no data, only if
an empty fetch request comes back and there is no other fetch request
that did return data. Thus, for most cases, poll() should still return
early and provide data. -- Thus, I have no concerns with the slight
behavior change.

Would be good to get input from others about this question though.


-Matthias


On 2/3/21 10:06 AM, John Roesler wrote:
> Hello again all,
> 
> I'm resurrecting this thread to discuss an issue that has
> come up after merging the code for this KIP.
> 
> The issue is that some of the system tests need to be
> updated in the same way that this integration test needed to
> be updated:
> https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888
> 
> This issue was reported here:
> https://issues.apache.org/jira/browse/KAFKA-12268
> and there is some preliminary discussion here:
> https://github.com/apache/kafka/pull/10022
> 
> First, let me offer my apologies for failing to catch this
> before the merge. I'm sorry that it became Rajini's work to
> track down the cause of the failure, when it was my
> responsibility to ensure the feature was merged safely.
> 
> To recap the situation:
> Consumer#poll(Duration) will now return before the duration
> expires even if there are no records returned if there is
> some returned metadata.
> 
> This behavior was important for KIP-695. In the situation
> where we get no records back for some partition, Streams
> needs to have the freshest possible information about
> whether  there are no new records on the broker, or whether
> there are records on the broker that we still need to fetch.
> If that's not clear, the KIP contains the full story.
> 
> It's definitely a behavior change, but our rationale was
> that it's an acceptable behavior change. Our big alternative
> is to add a _new_ method to Consumer to
> pollForRecordsOrMetadata(Duration) or something.
> 
> It seems unreliable to expect the broker to return a
> particular record within a particular timeout in general,
> which is what these tests are doing. The broker can decide
> for several reasons not to return data for a partition, but
> return data for another partition instead.
> 
> It seems like the only case where you might reasonably try
> to rely on that is in a test, where you first write a record
> to a partition, then you assign only that one partition to a
> consumer, then you poll on the consumer, expecting it to
> return the data you just wrote.
> 
> So the $10 question here is whether we should support this
> apparently artificial (testing-only) use case to the point
> where it's worth adding a whole new method to the Consumer
> interface.
> 
> Thanks all,
> John
> 
> On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote:
>> Thanks Jason,
>>
>> We would only return the metadata for the latest fetches.
>> So, if someone wanted to use this to lazily maintain a
>> client-side metadata map for all partitions, they'd have to
>> store it separately and merge in new updates as they arrive.
>>
>> This way:
>> 1. We don't need to increase the complexity of the client by
>> storing that metadata
>> 2. Users will be able to treat all returned metadata as
>> "fresh" without having to reason about the timestamps.
>> 3. All parts of the returned ConsumerRecords object have the
>> same lifecycle: all the data and metadata are the results of
>> the most recent round of fetch responses that had not been
>> previously polled.
>>
>> Does that seem sensible to you? I'll update the KIP to
>> clarify this.
>>
>> Thanks,
>> -John
>>
>> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote:
>>> Hi John,
>>>
>>> Just one question. It wasn't very clear to me exactly when the metadata
>>> would be returned in `ConsumerRecords`. Would we /always/ include the
>>> metadata for all partitions that are assigned, or would it be based on the
>>> latest fetches?
>>>
>>> Thanks,
>>> Jason
>>>
>>> On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:
>>>
 Thanks, Guozhang!

 All of your feedback sounds good to me. I’ll update the KIP when I am able.

 3) I believe it is the position after the fetch, but I will confirm. I
 think omitting position may render beginning and end offsets useless as
 well, which leaves only lag. That would be fine with me, but it also seems
 nice to supply this extra metadata since it is well defined and probably
 handy for others. Therefore, I’d go the route of specifying the exact
 semantics and keeping it.

 Thanks for the review,
 John

 On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> Hello John,
>>

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-02-03 Thread John Roesler
Hello again all,

I'm resurrecting this thread to discuss an issue that has
come up after merging the code for this KIP.

The issue is that some of the system tests need to be
updated in the same way that this integration test needed to
be updated:
https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888

This issue was reported here:
https://issues.apache.org/jira/browse/KAFKA-12268
and there is some preliminary discussion here:
https://github.com/apache/kafka/pull/10022

First, let me offer my apologies for failing to catch this
before the merge. I'm sorry that it became Rajini's work to
track down the cause of the failure, when it was my
responsibility to ensure the feature was merged safely.

To recap the situation:
Consumer#poll(Duration) will now return before the duration
expires even if there are no records returned if there is
some returned metadata.

This behavior was important for KIP-695. In the situation
where we get no records back for some partition, Streams
needs to have the freshest possible information about
whether  there are no new records on the broker, or whether
there are records on the broker that we still need to fetch.
If that's not clear, the KIP contains the full story.

It's definitely a behavior change, but our rationale was
that it's an acceptable behavior change. Our big alternative
is to add a _new_ method to Consumer to
pollForRecordsOrMetadata(Duration) or something.

It seems unreliable to expect the broker to return a
particular record within a particular timeout in general,
which is what these tests are doing. The broker can decide
for several reasons not to return data for a partition, but
return data for another partition instead.

It seems like the only case where you might reasonably try
to rely on that is in a test, where you first write a record
to a partition, then you assign only that one partition to a
consumer, then you poll on the consumer, expecting it to
return the data you just wrote.

So the $10 question here is whether we should support this
apparently artificial (testing-only) use case to the point
where it's worth adding a whole new method to the Consumer
interface.

Thanks all,
John

On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote:
> Thanks Jason,
> 
> We would only return the metadata for the latest fetches.
> So, if someone wanted to use this to lazily maintain a
> client-side metadata map for all partitions, they'd have to
> store it separately and merge in new updates as they arrive.
> 
> This way:
> 1. We don't need to increase the complexity of the client by
> storing that metadata
> 2. Users will be able to treat all returned metadata as
> "fresh" without having to reason about the timestamps.
> 3. All parts of the returned ConsumerRecords object have the
> same lifecycle: all the data and metadata are the results of
> the most recent round of fetch responses that had not been
> previously polled.
> 
> Does that seem sensible to you? I'll update the KIP to
> clarify this.
> 
> Thanks,
> -John
> 
> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote:
> > Hi John,
> > 
> > Just one question. It wasn't very clear to me exactly when the metadata
> > would be returned in `ConsumerRecords`. Would we /always/ include the
> > metadata for all partitions that are assigned, or would it be based on the
> > latest fetches?
> > 
> > Thanks,
> > Jason
> > 
> > On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:
> > 
> > > Thanks, Guozhang!
> > > 
> > > All of your feedback sounds good to me. I’ll update the KIP when I am 
> > > able.
> > > 
> > > 3) I believe it is the position after the fetch, but I will confirm. I
> > > think omitting position may render beginning and end offsets useless as
> > > well, which leaves only lag. That would be fine with me, but it also seems
> > > nice to supply this extra metadata since it is well defined and probably
> > > handy for others. Therefore, I’d go the route of specifying the exact
> > > semantics and keeping it.
> > > 
> > > Thanks for the review,
> > > John
> > > 
> > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > Hello John,
> > > > 
> > > > Thanks for the updates! I've made a pass on the KIP and also the POC PR,
> > > > here are some minor comments:
> > > > 
> > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > > updated,
> > > > and we do not create a new object but just update the values in-place, 
> > > > so
> > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > 
> > > > 2) It will be great to verify in javadocs that the new API
> > > > "ConsumerRecords#metadata(): Map" may return a
> > > > superset of TopicPartitions than the existing API that returns the data
> > > by
> > > > partitions, in case users assume their map key-entries would always be
> > > the
> > > > same.
> > > > 
> > > > 3) The "position()" API of the call needs better clarification: is it 
> > > > the
> > > >

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2021-01-06 Thread John Roesler
Hello all,

Thanks to all who participated in the discussion and vote.
I'm closing the vote now and marking KIP-695 as accepted:

* 4 binding +1 (Guozhang, Bill, Matthias, and myself)
* 2 non-binding +1 (Bruno and Walker)

The PRs will follow shortly.

Thanks,
-John

On Fri, 2020-12-18 at 11:53 -0800, Matthias J. Sax wrote:
> Sorry that I am late to the game.
> 
> +1 (binding)
> 
> We always knew about this gap when we did KIP-353, and thus, I am glad
> that we finally address it.
> 
> 
> -Matthias
> 
> On 12/18/20 10:16 AM, John Roesler wrote:
> > Thanks for your question, Ismael,
> > 
> > Are you concerned about the consumer performance, streams performance or 
> > both?
> > 
> > On the consumer side, this is only creating one extra struct for each 
> > response partition to represent the metadata that we already have access to 
> > internally. I don’t think this would have a measurable performance impact.
> > 
> > On the streams side, I would definitely like to ensure that performance 
> > doesn’t decrease for users. I ran our internal benchmarks on my POC branch 
> > and found that the measured throughput across all operations is within the 
> > 99% confidence interval of the baseline performance of trunk. I also 
> > deployed our internal soak test from my POC branch, which includes a join 
> > operation, and I observe that the throughput of that soak cluster is 
> > identical to the soak for trunk.
> > 
> > This result is to be expected, since the semantics improve the here would 
> > only kick in for Join/Merge operations where Streams is processing faster 
> > than it can fetch some partitions on average. I would expect Streams to 
> > catch up to the fetches occasionally, but not on average. 
> > 
> > It’s also worth noting that we have seen increasing numbers of users 
> > complaining of incorrect join results due to the current implementation. 
> > Even if the new implementation showed a modest drop in performance, I would 
> > advocate for correct results over top performance by default.
> > 
> > Finally, to assuage any lingering concerns, there is a configuration 
> > available to completely disable the new semantics proposed here and revert 
> > to the prior behavior. 
> > 
> > These details seem worth mentioning in the KIP. I’ll update the document 
> > shortly. 
> > 
> > Thanks again,
> > John
> > 
> > On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote:
> > > Hi John,
> > > 
> > > It would be good to make sure these changes have no measurable performance
> > > impact for the use cases that don't need it. Have we given this some
> > > thought? And what would be the perf testing strategy to verify this?
> > > 
> > > Ismael
> > > 
> > > On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:
> > > 
> > > > Thanks for the votes and reviews, all. I'll wait for a
> > > > response from Jason before closing the vote, since he asked
> > > > for clarification.
> > > > 
> > > > The present count is:
> > > > * 3 binding +1 (Guozhang, Bill, and myself)
> > > > * 2 non-binding +1 (Bruno and Walker)
> > > > 
> > > > I have updated the KIP document in response to the requests
> > > > for clarification:
> > > > 1) The new metadata() map actually just contains immutable
> > > > Metadata objects representing the metadata received in the
> > > > last round of fetch responses, so I decided to stick with
> > > > `receivedMetadata`, as that is an accurate representation of
> > > > the timestamp's meaning.
> > > > 
> > > > 2) I added a javadoc clarifying that the metadata partitions
> > > > may be a superset of the data partitions in the same
> > > > ConsumerRecords
> > > > 
> > > > 3) I confirmed that the position we are returning is the
> > > > next offset to fetch after the current returned records.
> > > > This is equivalent to the "current position" of the consumer
> > > > after the call to poll() that returns this ConsumerRecords
> > > > object
> > > > 
> > > > 4) (Jason's question about whether we include metadata for
> > > > all partitions or just the latest fetch responses) I've
> > > > clarified the javadoc to state that the metadata is only
> > > > what was included in the latest fetches.
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > 
> > > > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> > > > > Hi John,
> > > > > 
> > > > > I've made a pass over the KIP and I think it will be a good addition.
> > > > > 
> > > > > Modulo Jason's question, I'm a +1 (binding).
> > > > > 
> > > > > Thanks,
> > > > > Bill
> > > > > 
> > > > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
> > > > wrote:
> > > > > 
> > > > > > Hi John,
> > > > > > 
> > > > > > Just one question. It wasn't very clear to me exactly when the 
> > > > > > metadata
> > > > > > would be returned in `ConsumerRecords`. Would we /always/ include 
> > > > > > the
> > > > > > metadata for all partitions that are assigned, or would it be based 
> > > > > > on
> > > > the
> > > > > > latest fetches?
> > > > > > 
> > > > > > Thanks,
> > 

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread Matthias J. Sax
Sorry that I am late to the game.

+1 (binding)

We always knew about this gap when we did KIP-353, and thus, I am glad
that we finally address it.


-Matthias

On 12/18/20 10:16 AM, John Roesler wrote:
> Thanks for your question, Ismael,
> 
> Are you concerned about the consumer performance, streams performance or both?
> 
> On the consumer side, this is only creating one extra struct for each 
> response partition to represent the metadata that we already have access to 
> internally. I don’t think this would have a measurable performance impact.
> 
> On the streams side, I would definitely like to ensure that performance 
> doesn’t decrease for users. I ran our internal benchmarks on my POC branch 
> and found that the measured throughput across all operations is within the 
> 99% confidence interval of the baseline performance of trunk. I also deployed 
> our internal soak test from my POC branch, which includes a join operation, 
> and I observe that the throughput of that soak cluster is identical to the 
> soak for trunk.
> 
> This result is to be expected, since the semantics improve the here would 
> only kick in for Join/Merge operations where Streams is processing faster 
> than it can fetch some partitions on average. I would expect Streams to catch 
> up to the fetches occasionally, but not on average. 
> 
> It’s also worth noting that we have seen increasing numbers of users 
> complaining of incorrect join results due to the current implementation. Even 
> if the new implementation showed a modest drop in performance, I would 
> advocate for correct results over top performance by default.
> 
> Finally, to assuage any lingering concerns, there is a configuration 
> available to completely disable the new semantics proposed here and revert to 
> the prior behavior. 
> 
> These details seem worth mentioning in the KIP. I’ll update the document 
> shortly. 
> 
> Thanks again,
> John
> 
> On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote:
>> Hi John,
>>
>> It would be good to make sure these changes have no measurable performance
>> impact for the use cases that don't need it. Have we given this some
>> thought? And what would be the perf testing strategy to verify this?
>>
>> Ismael
>>
>> On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:
>>
>>> Thanks for the votes and reviews, all. I'll wait for a
>>> response from Jason before closing the vote, since he asked
>>> for clarification.
>>>
>>> The present count is:
>>> * 3 binding +1 (Guozhang, Bill, and myself)
>>> * 2 non-binding +1 (Bruno and Walker)
>>>
>>> I have updated the KIP document in response to the requests
>>> for clarification:
>>> 1) The new metadata() map actually just contains immutable
>>> Metadata objects representing the metadata received in the
>>> last round of fetch responses, so I decided to stick with
>>> `receivedMetadata`, as that is an accurate representation of
>>> the timestamp's meaning.
>>>
>>> 2) I added a javadoc clarifying that the metadata partitions
>>> may be a superset of the data partitions in the same
>>> ConsumerRecords
>>>
>>> 3) I confirmed that the position we are returning is the
>>> next offset to fetch after the current returned records.
>>> This is equivalent to the "current position" of the consumer
>>> after the call to poll() that returns this ConsumerRecords
>>> object
>>>
>>> 4) (Jason's question about whether we include metadata for
>>> all partitions or just the latest fetch responses) I've
>>> clarified the javadoc to state that the metadata is only
>>> what was included in the latest fetches.
>>>
>>> Thanks,
>>> -John
>>>
>>>
>>> On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
 Hi John,

 I've made a pass over the KIP and I think it will be a good addition.

 Modulo Jason's question, I'm a +1 (binding).

 Thanks,
 Bill

 On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
>>> wrote:

> Hi John,
>
> Just one question. It wasn't very clear to me exactly when the metadata
> would be returned in `ConsumerRecords`. Would we /always/ include the
> metadata for all partitions that are assigned, or would it be based on
>>> the
> latest fetches?
>
> Thanks,
> Jason
>
> On Fri, Dec 11, 2020 at 4:07 PM John Roesler 
>>> wrote:
>
>> Thanks, Guozhang!
>>
>> All of your feedback sounds good to me. I’ll update the KIP when I am
> able.
>>
>> 3) I believe it is the position after the fetch, but I will confirm.
>>> I
>> think omitting position may render beginning and end offsets useless
>>> as
>> well, which leaves only lag. That would be fine with me, but it also
> seems
>> nice to supply this extra metadata since it is well defined and
>>> probably
>> handy for others. Therefore, I’d go the route of specifying the exact
>> semantics and keeping it.
>>
>> Thanks for the review,
>> John
>>
>> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wro

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread John Roesler
Thanks for your question, Ismael,

Are you concerned about the consumer performance, streams performance or both?

On the consumer side, this is only creating one extra struct for each response 
partition to represent the metadata that we already have access to internally. 
I don’t think this would have a measurable performance impact.

On the streams side, I would definitely like to ensure that performance doesn’t 
decrease for users. I ran our internal benchmarks on my POC branch and found 
that the measured throughput across all operations is within the 99% confidence 
interval of the baseline performance of trunk. I also deployed our internal 
soak test from my POC branch, which includes a join operation, and I observe 
that the throughput of that soak cluster is identical to the soak for trunk.

This result is to be expected, since the semantics improve the here would only 
kick in for Join/Merge operations where Streams is processing faster than it 
can fetch some partitions on average. I would expect Streams to catch up to the 
fetches occasionally, but not on average. 

It’s also worth noting that we have seen increasing numbers of users 
complaining of incorrect join results due to the current implementation. Even 
if the new implementation showed a modest drop in performance, I would advocate 
for correct results over top performance by default.

Finally, to assuage any lingering concerns, there is a configuration available 
to completely disable the new semantics proposed here and revert to the prior 
behavior. 

These details seem worth mentioning in the KIP. I’ll update the document 
shortly. 

Thanks again,
John

On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote:
> Hi John,
> 
> It would be good to make sure these changes have no measurable performance
> impact for the use cases that don't need it. Have we given this some
> thought? And what would be the perf testing strategy to verify this?
> 
> Ismael
> 
> On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:
> 
> > Thanks for the votes and reviews, all. I'll wait for a
> > response from Jason before closing the vote, since he asked
> > for clarification.
> >
> > The present count is:
> > * 3 binding +1 (Guozhang, Bill, and myself)
> > * 2 non-binding +1 (Bruno and Walker)
> >
> > I have updated the KIP document in response to the requests
> > for clarification:
> > 1) The new metadata() map actually just contains immutable
> > Metadata objects representing the metadata received in the
> > last round of fetch responses, so I decided to stick with
> > `receivedMetadata`, as that is an accurate representation of
> > the timestamp's meaning.
> >
> > 2) I added a javadoc clarifying that the metadata partitions
> > may be a superset of the data partitions in the same
> > ConsumerRecords
> >
> > 3) I confirmed that the position we are returning is the
> > next offset to fetch after the current returned records.
> > This is equivalent to the "current position" of the consumer
> > after the call to poll() that returns this ConsumerRecords
> > object
> >
> > 4) (Jason's question about whether we include metadata for
> > all partitions or just the latest fetch responses) I've
> > clarified the javadoc to state that the metadata is only
> > what was included in the latest fetches.
> >
> > Thanks,
> > -John
> >
> >
> > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> > > Hi John,
> > >
> > > I've made a pass over the KIP and I think it will be a good addition.
> > >
> > > Modulo Jason's question, I'm a +1 (binding).
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
> > wrote:
> > >
> > > > Hi John,
> > > >
> > > > Just one question. It wasn't very clear to me exactly when the metadata
> > > > would be returned in `ConsumerRecords`. Would we /always/ include the
> > > > metadata for all partitions that are assigned, or would it be based on
> > the
> > > > latest fetches?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler 
> > wrote:
> > > >
> > > > > Thanks, Guozhang!
> > > > >
> > > > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > > > able.
> > > > >
> > > > > 3) I believe it is the position after the fetch, but I will confirm.
> > I
> > > > > think omitting position may render beginning and end offsets useless
> > as
> > > > > well, which leaves only lag. That would be fine with me, but it also
> > > > seems
> > > > > nice to supply this extra metadata since it is well defined and
> > probably
> > > > > handy for others. Therefore, I’d go the route of specifying the exact
> > > > > semantics and keeping it.
> > > > >
> > > > > Thanks for the review,
> > > > > John
> > > > >
> > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > > > Hello John,
> > > > > >
> > > > > > Thanks for the updates! I've made a pass on the KIP and also the
> > POC
> > > > PR,
> > > > > > here are some minor comments:
> > > > > >
> > > > >

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread Ismael Juma
Hi John,

It would be good to make sure these changes have no measurable performance
impact for the use cases that don't need it. Have we given this some
thought? And what would be the perf testing strategy to verify this?

Ismael

On Fri, Dec 18, 2020 at 8:39 AM John Roesler  wrote:

> Thanks for the votes and reviews, all. I'll wait for a
> response from Jason before closing the vote, since he asked
> for clarification.
>
> The present count is:
> * 3 binding +1 (Guozhang, Bill, and myself)
> * 2 non-binding +1 (Bruno and Walker)
>
> I have updated the KIP document in response to the requests
> for clarification:
> 1) The new metadata() map actually just contains immutable
> Metadata objects representing the metadata received in the
> last round of fetch responses, so I decided to stick with
> `receivedMetadata`, as that is an accurate representation of
> the timestamp's meaning.
>
> 2) I added a javadoc clarifying that the metadata partitions
> may be a superset of the data partitions in the same
> ConsumerRecords
>
> 3) I confirmed that the position we are returning is the
> next offset to fetch after the current returned records.
> This is equivalent to the "current position" of the consumer
> after the call to poll() that returns this ConsumerRecords
> object
>
> 4) (Jason's question about whether we include metadata for
> all partitions or just the latest fetch responses) I've
> clarified the javadoc to state that the metadata is only
> what was included in the latest fetches.
>
> Thanks,
> -John
>
>
> On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> > Hi John,
> >
> > I've made a pass over the KIP and I think it will be a good addition.
> >
> > Modulo Jason's question, I'm a +1 (binding).
> >
> > Thanks,
> > Bill
> >
> > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson 
> wrote:
> >
> > > Hi John,
> > >
> > > Just one question. It wasn't very clear to me exactly when the metadata
> > > would be returned in `ConsumerRecords`. Would we /always/ include the
> > > metadata for all partitions that are assigned, or would it be based on
> the
> > > latest fetches?
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler 
> wrote:
> > >
> > > > Thanks, Guozhang!
> > > >
> > > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > > able.
> > > >
> > > > 3) I believe it is the position after the fetch, but I will confirm.
> I
> > > > think omitting position may render beginning and end offsets useless
> as
> > > > well, which leaves only lag. That would be fine with me, but it also
> > > seems
> > > > nice to supply this extra metadata since it is well defined and
> probably
> > > > handy for others. Therefore, I’d go the route of specifying the exact
> > > > semantics and keeping it.
> > > >
> > > > Thanks for the review,
> > > > John
> > > >
> > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > > Hello John,
> > > > >
> > > > > Thanks for the updates! I've made a pass on the KIP and also the
> POC
> > > PR,
> > > > > here are some minor comments:
> > > > >
> > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > > > updated,
> > > > > and we do not create a new object but just update the values
> in-place,
> > > so
> > > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > >
> > > > > 2) It will be great to verify in javadocs that the new API
> > > > > "ConsumerRecords#metadata(): Map" may
> return
> > > a
> > > > > superset of TopicPartitions than the existing API that returns the
> data
> > > > by
> > > > > partitions, in case users assume their map key-entries would
> always be
> > > > the
> > > > > same.
> > > > >
> > > > > 3) The "position()" API of the call needs better clarification: is
> it
> > > the
> > > > > current position AFTER the records are returned, or is it BEFORE
> the
> > > > > records are returned? Personally I'd suggest we do not include it
> if it
> > > > is
> > > > > not used anywhere yet just to avoid possible misuage, but I'm fine
> if
> > > you
> > > > > like to keep it still; in that case just clarify its semantics.
> > > > >
> > > > >
> > > > > Other than that,I'm +1 on the KIP as well !
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson <
> wcarl...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > walker
> > > > > >
> > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <
> br...@confluent.io>
> > > > wrote:
> > > > > >
> > > > > > > Thanks for the KIP, John!
> > > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > Best,
> > > > > > > Bruno
> > > > > > >
> > > > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > > > like to go ahead and call for a vote.
> > > > > > > >
> > > > > >

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-18 Thread John Roesler
Thanks for the votes and reviews, all. I'll wait for a
response from Jason before closing the vote, since he asked
for clarification.

The present count is:
* 3 binding +1 (Guozhang, Bill, and myself)
* 2 non-binding +1 (Bruno and Walker)

I have updated the KIP document in response to the requests
for clarification:
1) The new metadata() map actually just contains immutable
Metadata objects representing the metadata received in the
last round of fetch responses, so I decided to stick with
`receivedMetadata`, as that is an accurate representation of
the timestamp's meaning.

2) I added a javadoc clarifying that the metadata partitions
may be a superset of the data partitions in the same
ConsumerRecords

3) I confirmed that the position we are returning is the
next offset to fetch after the current returned records.
This is equivalent to the "current position" of the consumer
after the call to poll() that returns this ConsumerRecords
object

4) (Jason's question about whether we include metadata for
all partitions or just the latest fetch responses) I've
clarified the javadoc to state that the metadata is only
what was included in the latest fetches.

Thanks,
-John


On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> Hi John,
> 
> I've made a pass over the KIP and I think it will be a good addition.
> 
> Modulo Jason's question, I'm a +1 (binding).
> 
> Thanks,
> Bill
> 
> On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson  wrote:
> 
> > Hi John,
> > 
> > Just one question. It wasn't very clear to me exactly when the metadata
> > would be returned in `ConsumerRecords`. Would we /always/ include the
> > metadata for all partitions that are assigned, or would it be based on the
> > latest fetches?
> > 
> > Thanks,
> > Jason
> > 
> > On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:
> > 
> > > Thanks, Guozhang!
> > > 
> > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > able.
> > > 
> > > 3) I believe it is the position after the fetch, but I will confirm. I
> > > think omitting position may render beginning and end offsets useless as
> > > well, which leaves only lag. That would be fine with me, but it also
> > seems
> > > nice to supply this extra metadata since it is well defined and probably
> > > handy for others. Therefore, I’d go the route of specifying the exact
> > > semantics and keeping it.
> > > 
> > > Thanks for the review,
> > > John
> > > 
> > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > Hello John,
> > > > 
> > > > Thanks for the updates! I've made a pass on the KIP and also the POC
> > PR,
> > > > here are some minor comments:
> > > > 
> > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > > updated,
> > > > and we do not create a new object but just update the values in-place,
> > so
> > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > 
> > > > 2) It will be great to verify in javadocs that the new API
> > > > "ConsumerRecords#metadata(): Map" may return
> > a
> > > > superset of TopicPartitions than the existing API that returns the data
> > > by
> > > > partitions, in case users assume their map key-entries would always be
> > > the
> > > > same.
> > > > 
> > > > 3) The "position()" API of the call needs better clarification: is it
> > the
> > > > current position AFTER the records are returned, or is it BEFORE the
> > > > records are returned? Personally I'd suggest we do not include it if it
> > > is
> > > > not used anywhere yet just to avoid possible misuage, but I'm fine if
> > you
> > > > like to keep it still; in that case just clarify its semantics.
> > > > 
> > > > 
> > > > Other than that,I'm +1 on the KIP as well !
> > > > 
> > > > 
> > > > Guozhang
> > > > 
> > > > 
> > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
> > > > wrote:
> > > > 
> > > > > Thanks for the KIP!
> > > > > 
> > > > > +1 (non-binding)
> > > > > 
> > > > > walker
> > > > > 
> > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna 
> > > wrote:
> > > > > 
> > > > > > Thanks for the KIP, John!
> > > > > > 
> > > > > > +1 (non-binding)
> > > > > > 
> > > > > > Best,
> > > > > > Bruno
> > > > > > 
> > > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > > Hello all,
> > > > > > > 
> > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > > like to go ahead and call for a vote.
> > > > > > > 
> > > > > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > > > > will allow Streams to offer deterministic time semantics in
> > > > > > > join-type topologies. For example, it makes sure that
> > > > > > > when you join two topics, that we collate the topics by
> > > > > > > timestamp. That was always the intent with task idling (KIP-
> > > > > > > 353), but it turns out the previous mechanism couldn't
> > > > > > > provide the desired semantics.
> > > > > > > 
> > > > > > > The details are here:
> > > > > > > http

Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-17 Thread John Roesler
Thanks Jason,

We would only return the metadata for the latest fetches.
So, if someone wanted to use this to lazily maintain a
client-side metadata map for all partitions, they'd have to
store it separately and merge in new updates as they arrive.

This way:
1. We don't need to increase the complexity of the client by
storing that metadata
2. Users will be able to treat all returned metadata as
"fresh" without having to reason about the timestamps.
3. All parts of the returned ConsumerRecords object have the
same lifecycle: all the data and metadata are the results of
the most recent round of fetch responses that had not been
previously polled.

Does that seem sensible to you? I'll update the KIP to
clarify this.

Thanks,
-John

On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote:
> Hi John,
> 
> Just one question. It wasn't very clear to me exactly when the metadata
> would be returned in `ConsumerRecords`. Would we /always/ include the
> metadata for all partitions that are assigned, or would it be based on the
> latest fetches?
> 
> Thanks,
> Jason
> 
> On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:
> 
> > Thanks, Guozhang!
> > 
> > All of your feedback sounds good to me. I’ll update the KIP when I am able.
> > 
> > 3) I believe it is the position after the fetch, but I will confirm. I
> > think omitting position may render beginning and end offsets useless as
> > well, which leaves only lag. That would be fine with me, but it also seems
> > nice to supply this extra metadata since it is well defined and probably
> > handy for others. Therefore, I’d go the route of specifying the exact
> > semantics and keeping it.
> > 
> > Thanks for the review,
> > John
> > 
> > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > Hello John,
> > > 
> > > Thanks for the updates! I've made a pass on the KIP and also the POC PR,
> > > here are some minor comments:
> > > 
> > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > updated,
> > > and we do not create a new object but just update the values in-place, so
> > > maybe calling it `lastUpdateTimstamp` is better?
> > > 
> > > 2) It will be great to verify in javadocs that the new API
> > > "ConsumerRecords#metadata(): Map" may return a
> > > superset of TopicPartitions than the existing API that returns the data
> > by
> > > partitions, in case users assume their map key-entries would always be
> > the
> > > same.
> > > 
> > > 3) The "position()" API of the call needs better clarification: is it the
> > > current position AFTER the records are returned, or is it BEFORE the
> > > records are returned? Personally I'd suggest we do not include it if it
> > is
> > > not used anywhere yet just to avoid possible misuage, but I'm fine if you
> > > like to keep it still; in that case just clarify its semantics.
> > > 
> > > 
> > > Other than that,I'm +1 on the KIP as well !
> > > 
> > > 
> > > Guozhang
> > > 
> > > 
> > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
> > > wrote:
> > > 
> > > > Thanks for the KIP!
> > > > 
> > > > +1 (non-binding)
> > > > 
> > > > walker
> > > > 
> > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna 
> > wrote:
> > > > 
> > > > > Thanks for the KIP, John!
> > > > > 
> > > > > +1 (non-binding)
> > > > > 
> > > > > Best,
> > > > > Bruno
> > > > > 
> > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > Hello all,
> > > > > > 
> > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > like to go ahead and call for a vote.
> > > > > > 
> > > > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > > > will allow Streams to offer deterministic time semantics in
> > > > > > join-type topologies. For example, it makes sure that
> > > > > > when you join two topics, that we collate the topics by
> > > > > > timestamp. That was always the intent with task idling (KIP-
> > > > > > 353), but it turns out the previous mechanism couldn't
> > > > > > provide the desired semantics.
> > > > > > 
> > > > > > The details are here:
> > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > > > 
> > > > > > Thanks,
> > > > > > -John
> > > > > > 
> > > > > 
> > > > 
> > > 
> > > 
> > > --
> > > -- Guozhang
> > > 
> > 




Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-17 Thread Bill Bejeck
Hi John,

I've made a pass over the KIP and I think it will be a good addition.

Modulo Jason's question, I'm a +1 (binding).

Thanks,
Bill

On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson  wrote:

> Hi John,
>
> Just one question. It wasn't very clear to me exactly when the metadata
> would be returned in `ConsumerRecords`. Would we /always/ include the
> metadata for all partitions that are assigned, or would it be based on the
> latest fetches?
>
> Thanks,
> Jason
>
> On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:
>
> > Thanks, Guozhang!
> >
> > All of your feedback sounds good to me. I’ll update the KIP when I am
> able.
> >
> > 3) I believe it is the position after the fetch, but I will confirm. I
> > think omitting position may render beginning and end offsets useless as
> > well, which leaves only lag. That would be fine with me, but it also
> seems
> > nice to supply this extra metadata since it is well defined and probably
> > handy for others. Therefore, I’d go the route of specifying the exact
> > semantics and keeping it.
> >
> > Thanks for the review,
> > John
> >
> > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > Hello John,
> > >
> > > Thanks for the updates! I've made a pass on the KIP and also the POC
> PR,
> > > here are some minor comments:
> > >
> > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > updated,
> > > and we do not create a new object but just update the values in-place,
> so
> > > maybe calling it `lastUpdateTimstamp` is better?
> > >
> > > 2) It will be great to verify in javadocs that the new API
> > > "ConsumerRecords#metadata(): Map" may return
> a
> > > superset of TopicPartitions than the existing API that returns the data
> > by
> > > partitions, in case users assume their map key-entries would always be
> > the
> > > same.
> > >
> > > 3) The "position()" API of the call needs better clarification: is it
> the
> > > current position AFTER the records are returned, or is it BEFORE the
> > > records are returned? Personally I'd suggest we do not include it if it
> > is
> > > not used anywhere yet just to avoid possible misuage, but I'm fine if
> you
> > > like to keep it still; in that case just clarify its semantics.
> > >
> > >
> > > Other than that,I'm +1 on the KIP as well !
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
> > > wrote:
> > >
> > > > Thanks for the KIP!
> > > >
> > > > +1 (non-binding)
> > > >
> > > > walker
> > > >
> > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna 
> > wrote:
> > > >
> > > > > Thanks for the KIP, John!
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > Hello all,
> > > > > >
> > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > like to go ahead and call for a vote.
> > > > > >
> > > > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > > > will allow Streams to offer deterministic time semantics in
> > > > > > join-type topologies. For example, it makes sure that
> > > > > > when you join two topics, that we collate the topics by
> > > > > > timestamp. That was always the intent with task idling (KIP-
> > > > > > 353), but it turns out the previous mechanism couldn't
> > > > > > provide the desired semantics.
> > > > > >
> > > > > > The details are here:
> > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-16 Thread Jason Gustafson
Hi John,

Just one question. It wasn't very clear to me exactly when the metadata
would be returned in `ConsumerRecords`. Would we /always/ include the
metadata for all partitions that are assigned, or would it be based on the
latest fetches?

Thanks,
Jason

On Fri, Dec 11, 2020 at 4:07 PM John Roesler  wrote:

> Thanks, Guozhang!
>
> All of your feedback sounds good to me. I’ll update the KIP when I am able.
>
> 3) I believe it is the position after the fetch, but I will confirm. I
> think omitting position may render beginning and end offsets useless as
> well, which leaves only lag. That would be fine with me, but it also seems
> nice to supply this extra metadata since it is well defined and probably
> handy for others. Therefore, I’d go the route of specifying the exact
> semantics and keeping it.
>
> Thanks for the review,
> John
>
> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > Hello John,
> >
> > Thanks for the updates! I've made a pass on the KIP and also the POC PR,
> > here are some minor comments:
> >
> > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> updated,
> > and we do not create a new object but just update the values in-place, so
> > maybe calling it `lastUpdateTimstamp` is better?
> >
> > 2) It will be great to verify in javadocs that the new API
> > "ConsumerRecords#metadata(): Map" may return a
> > superset of TopicPartitions than the existing API that returns the data
> by
> > partitions, in case users assume their map key-entries would always be
> the
> > same.
> >
> > 3) The "position()" API of the call needs better clarification: is it the
> > current position AFTER the records are returned, or is it BEFORE the
> > records are returned? Personally I'd suggest we do not include it if it
> is
> > not used anywhere yet just to avoid possible misuage, but I'm fine if you
> > like to keep it still; in that case just clarify its semantics.
> >
> >
> > Other than that,I'm +1 on the KIP as well !
> >
> >
> > Guozhang
> >
> >
> > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
> > wrote:
> >
> > > Thanks for the KIP!
> > >
> > > +1 (non-binding)
> > >
> > > walker
> > >
> > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna 
> wrote:
> > >
> > > > Thanks for the KIP, John!
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > Hello all,
> > > > >
> > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > like to go ahead and call for a vote.
> > > > >
> > > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > > will allow Streams to offer deterministic time semantics in
> > > > > join-type topologies. For example, it makes sure that
> > > > > when you join two topics, that we collate the topics by
> > > > > timestamp. That was always the intent with task idling (KIP-
> > > > > 353), but it turns out the previous mechanism couldn't
> > > > > provide the desired semantics.
> > > > >
> > > > > The details are here:
> > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-11 Thread John Roesler
Thanks, Guozhang!

All of your feedback sounds good to me. I’ll update the KIP when I am able.

3) I believe it is the position after the fetch, but I will confirm. I think 
omitting position may render beginning and end offsets useless as well, which 
leaves only lag. That would be fine with me, but it also seems nice to supply 
this extra metadata since it is well defined and probably handy for others. 
Therefore, I’d go the route of specifying the exact semantics and keeping it.

Thanks for the review,
John

On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> Hello John,
> 
> Thanks for the updates! I've made a pass on the KIP and also the POC PR,
> here are some minor comments:
> 
> 1) nit: "receivedTimestamp" -> it seems the metadata keep getting updated,
> and we do not create a new object but just update the values in-place, so
> maybe calling it `lastUpdateTimstamp` is better?
> 
> 2) It will be great to verify in javadocs that the new API
> "ConsumerRecords#metadata(): Map" may return a
> superset of TopicPartitions than the existing API that returns the data by
> partitions, in case users assume their map key-entries would always be the
> same.
> 
> 3) The "position()" API of the call needs better clarification: is it the
> current position AFTER the records are returned, or is it BEFORE the
> records are returned? Personally I'd suggest we do not include it if it is
> not used anywhere yet just to avoid possible misuage, but I'm fine if you
> like to keep it still; in that case just clarify its semantics.
> 
> 
> Other than that,I'm +1 on the KIP as well !
> 
> 
> Guozhang
> 
> 
> On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
> wrote:
> 
> > Thanks for the KIP!
> >
> > +1 (non-binding)
> >
> > walker
> >
> > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna  wrote:
> >
> > > Thanks for the KIP, John!
> > >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Bruno
> > >
> > > On 08.12.20 18:03, John Roesler wrote:
> > > > Hello all,
> > > >
> > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > like to go ahead and call for a vote.
> > > >
> > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > will allow Streams to offer deterministic time semantics in
> > > > join-type topologies. For example, it makes sure that
> > > > when you join two topics, that we collate the topics by
> > > > timestamp. That was always the intent with task idling (KIP-
> > > > 353), but it turns out the previous mechanism couldn't
> > > > provide the desired semantics.
> > > >
> > > > The details are here:
> > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > >
> >
> 
> 
> -- 
> -- Guozhang
>


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-11 Thread Guozhang Wang
Hello John,

Thanks for the updates! I've made a pass on the KIP and also the POC PR,
here are some minor comments:

1) nit: "receivedTimestamp" -> it seems the metadata keep getting updated,
and we do not create a new object but just update the values in-place, so
maybe calling it `lastUpdateTimstamp` is better?

2) It will be great to verify in javadocs that the new API
"ConsumerRecords#metadata(): Map" may return a
superset of TopicPartitions than the existing API that returns the data by
partitions, in case users assume their map key-entries would always be the
same.

3) The "position()" API of the call needs better clarification: is it the
current position AFTER the records are returned, or is it BEFORE the
records are returned? Personally I'd suggest we do not include it if it is
not used anywhere yet just to avoid possible misuage, but I'm fine if you
like to keep it still; in that case just clarify its semantics.


Other than that,I'm +1 on the KIP as well !


Guozhang


On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson 
wrote:

> Thanks for the KIP!
>
> +1 (non-binding)
>
> walker
>
> On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna  wrote:
>
> > Thanks for the KIP, John!
> >
> > +1 (non-binding)
> >
> > Best,
> > Bruno
> >
> > On 08.12.20 18:03, John Roesler wrote:
> > > Hello all,
> > >
> > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > like to go ahead and call for a vote.
> > >
> > > As a reminder, the purpose of KIP-695 to improve on the
> > > "task idling" feature we introduced in KIP-353. This KIP
> > > will allow Streams to offer deterministic time semantics in
> > > join-type topologies. For example, it makes sure that
> > > when you join two topics, that we collate the topics by
> > > timestamp. That was always the intent with task idling (KIP-
> > > 353), but it turns out the previous mechanism couldn't
> > > provide the desired semantics.
> > >
> > > The details are here:
> > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > >
> > > Thanks,
> > > -John
> > >
> >
>


-- 
-- Guozhang


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-11 Thread Walker Carlson
Thanks for the KIP!

+1 (non-binding)

walker

On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna  wrote:

> Thanks for the KIP, John!
>
> +1 (non-binding)
>
> Best,
> Bruno
>
> On 08.12.20 18:03, John Roesler wrote:
> > Hello all,
> >
> > There hasn't been much discussion on KIP-695 so far, so I'd
> > like to go ahead and call for a vote.
> >
> > As a reminder, the purpose of KIP-695 to improve on the
> > "task idling" feature we introduced in KIP-353. This KIP
> > will allow Streams to offer deterministic time semantics in
> > join-type topologies. For example, it makes sure that
> > when you join two topics, that we collate the topics by
> > timestamp. That was always the intent with task idling (KIP-
> > 353), but it turns out the previous mechanism couldn't
> > provide the desired semantics.
> >
> > The details are here:
> > https://cwiki.apache.org/confluence/x/JSXZCQ
> >
> > Thanks,
> > -John
> >
>


Re: [VOTE] KIP-695: Improve Streams Time Synchronization

2020-12-09 Thread Bruno Cadonna

Thanks for the KIP, John!

+1 (non-binding)

Best,
Bruno

On 08.12.20 18:03, John Roesler wrote:

Hello all,

There hasn't been much discussion on KIP-695 so far, so I'd
like to go ahead and call for a vote.

As a reminder, the purpose of KIP-695 to improve on the
"task idling" feature we introduced in KIP-353. This KIP
will allow Streams to offer deterministic time semantics in
join-type topologies. For example, it makes sure that
when you join two topics, that we collate the topics by
timestamp. That was always the intent with task idling (KIP-
353), but it turns out the previous mechanism couldn't
provide the desired semantics.

The details are here:
https://cwiki.apache.org/confluence/x/JSXZCQ

Thanks,
-John