Thanks for your question, Ismael,

Are you concerned about the consumer performance, streams performance or both?

On the consumer side, this is only creating one extra struct for each response 
partition to represent the metadata that we already have access to internally. 
I don’t think this would have a measurable performance impact.

On the streams side, I would definitely like to ensure that performance doesn’t 
decrease for users. I ran our internal benchmarks on my POC branch and found 
that the measured throughput across all operations is within the 99% confidence 
interval of the baseline performance of trunk. I also deployed our internal 
soak test from my POC branch, which includes a join operation, and I observe 
that the throughput of that soak cluster is identical to the soak for trunk.

This result is to be expected, since the semantics improve the here would only 
kick in for Join/Merge operations where Streams is processing faster than it 
can fetch some partitions on average. I would expect Streams to catch up to the 
fetches occasionally, but not on average. 

It’s also worth noting that we have seen increasing numbers of users 
complaining of incorrect join results due to the current implementation. Even 
if the new implementation showed a modest drop in performance, I would advocate 
for correct results over top performance by default.

Finally, to assuage any lingering concerns, there is a configuration available 
to completely disable the new semantics proposed here and revert to the prior 
behavior. 

These details seem worth mentioning in the KIP. I’ll update the document 
shortly. 

Thanks again,
John

On Fri, Dec 18, 2020, at 11:45, Ismael Juma wrote:
> Hi John,
> 
> It would be good to make sure these changes have no measurable performance
> impact for the use cases that don't need it. Have we given this some
> thought? And what would be the perf testing strategy to verify this?
> 
> Ismael
> 
> On Fri, Dec 18, 2020 at 8:39 AM John Roesler <vvcep...@apache.org> wrote:
> 
> > Thanks for the votes and reviews, all. I'll wait for a
> > response from Jason before closing the vote, since he asked
> > for clarification.
> >
> > The present count is:
> > * 3 binding +1 (Guozhang, Bill, and myself)
> > * 2 non-binding +1 (Bruno and Walker)
> >
> > I have updated the KIP document in response to the requests
> > for clarification:
> > 1) The new metadata() map actually just contains immutable
> > Metadata objects representing the metadata received in the
> > last round of fetch responses, so I decided to stick with
> > `receivedMetadata`, as that is an accurate representation of
> > the timestamp's meaning.
> >
> > 2) I added a javadoc clarifying that the metadata partitions
> > may be a superset of the data partitions in the same
> > ConsumerRecords
> >
> > 3) I confirmed that the position we are returning is the
> > next offset to fetch after the current returned records.
> > This is equivalent to the "current position" of the consumer
> > after the call to poll() that returns this ConsumerRecords
> > object
> >
> > 4) (Jason's question about whether we include metadata for
> > all partitions or just the latest fetch responses) I've
> > clarified the javadoc to state that the metadata is only
> > what was included in the latest fetches.
> >
> > Thanks,
> > -John
> >
> >
> > On Thu, 2020-12-17 at 11:42 -0500, Bill Bejeck wrote:
> > > Hi John,
> > >
> > > I've made a pass over the KIP and I think it will be a good addition.
> > >
> > > Modulo Jason's question, I'm a +1 (binding).
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Dec 16, 2020 at 1:29 PM Jason Gustafson <ja...@confluent.io>
> > wrote:
> > >
> > > > Hi John,
> > > >
> > > > Just one question. It wasn't very clear to me exactly when the metadata
> > > > would be returned in `ConsumerRecords`. Would we /always/ include the
> > > > metadata for all partitions that are assigned, or would it be based on
> > the
> > > > latest fetches?
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Fri, Dec 11, 2020 at 4:07 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > > >
> > > > > Thanks, Guozhang!
> > > > >
> > > > > All of your feedback sounds good to me. I’ll update the KIP when I am
> > > > able.
> > > > >
> > > > > 3) I believe it is the position after the fetch, but I will confirm.
> > I
> > > > > think omitting position may render beginning and end offsets useless
> > as
> > > > > well, which leaves only lag. That would be fine with me, but it also
> > > > seems
> > > > > nice to supply this extra metadata since it is well defined and
> > probably
> > > > > handy for others. Therefore, I’d go the route of specifying the exact
> > > > > semantics and keeping it.
> > > > >
> > > > > Thanks for the review,
> > > > > John
> > > > >
> > > > > On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote:
> > > > > > Hello John,
> > > > > >
> > > > > > Thanks for the updates! I've made a pass on the KIP and also the
> > POC
> > > > PR,
> > > > > > here are some minor comments:
> > > > > >
> > > > > > 1) nit: "receivedTimestamp" -> it seems the metadata keep getting
> > > > > updated,
> > > > > > and we do not create a new object but just update the values
> > in-place,
> > > > so
> > > > > > maybe calling it `lastUpdateTimstamp` is better?
> > > > > >
> > > > > > 2) It will be great to verify in javadocs that the new API
> > > > > > "ConsumerRecords#metadata(): Map<TopicPartition, Metadata>" may
> > return
> > > > a
> > > > > > superset of TopicPartitions than the existing API that returns the
> > data
> > > > > by
> > > > > > partitions, in case users assume their map key-entries would
> > always be
> > > > > the
> > > > > > same.
> > > > > >
> > > > > > 3) The "position()" API of the call needs better clarification: is
> > it
> > > > the
> > > > > > current position AFTER the records are returned, or is it BEFORE
> > the
> > > > > > records are returned? Personally I'd suggest we do not include it
> > if it
> > > > > is
> > > > > > not used anywhere yet just to avoid possible misuage, but I'm fine
> > if
> > > > you
> > > > > > like to keep it still; in that case just clarify its semantics.
> > > > > >
> > > > > >
> > > > > > Other than that,I'm +1 on the KIP as well !
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson <
> > wcarl...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks for the KIP!
> > > > > > >
> > > > > > > +1 (non-binding)
> > > > > > >
> > > > > > > walker
> > > > > > >
> > > > > > > On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <
> > br...@confluent.io>
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the KIP, John!
> > > > > > > >
> > > > > > > > +1 (non-binding)
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Bruno
> > > > > > > >
> > > > > > > > On 08.12.20 18:03, John Roesler wrote:
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > There hasn't been much discussion on KIP-695 so far, so I'd
> > > > > > > > > like to go ahead and call for a vote.
> > > > > > > > >
> > > > > > > > > As a reminder, the purpose of KIP-695 to improve on the
> > > > > > > > > "task idling" feature we introduced in KIP-353. This KIP
> > > > > > > > > will allow Streams to offer deterministic time semantics in
> > > > > > > > > join-type topologies. For example, it makes sure that
> > > > > > > > > when you join two topics, that we collate the topics by
> > > > > > > > > timestamp. That was always the intent with task idling (KIP-
> > > > > > > > > 353), but it turns out the previous mechanism couldn't
> > > > > > > > > provide the desired semantics.
> > > > > > > > >
> > > > > > > > > The details are here:
> > > > > > > > > https://cwiki.apache.org/confluence/x/JSXZCQ
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > -John
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> >
> >
> >
>

Reply via email to