Thanks.

Glad we could align.


-Matthias

On 12/21/22 2:09 AM, Sagar wrote:
Hi All,

Just as an update, the changes described here:

```
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.
```

are reverted back. As things stand,  KeyQueryMetada exposes only the
partition() method and the restriction for single partition is added back
for IQ. This has been done based on the points raised by Matthias above.

The KIP has been updated accordingly.

Thanks!
Sagar.

On Sat, Dec 10, 2022 at 12:09 AM Sagar <sagarmeansoc...@gmail.com> wrote:

Hey Matthias,

Actually I had shared the PR link for any potential issues that might have
gone missing. I guess it didn't come out that way in my response. Apologies
for that!

I am more than happy to incorporate any feedback/changes or address any
concerns that are still present around this at this point as well.

And I would keep in mind the feedback to provide more time in such a
scenario.

Thanks!
Sagar.

On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax <mj...@apache.org> wrote:

It is what it is.

we did have internal discussions on this

We sometimes have the case that a KIP need adjustment as stuff is
discovered during coding. And having a discussion on the PR about it is
fine. -- However, before the PR gets merge, the KIP change should be
announced to verify that nobody has objections to he change, before we
carry forward.

It's up to the committer who reviews/merges the PR to ensure that this
process is followed IMHO. I hope we can do better next time.

(And yes, there was the 3.4 release KIP deadline that might explain it,
but it seems important that we give enough time is make "tricky" changes
and not rush into stuff IMHO.)


-Matthias


On 12/8/22 7:04 PM, Sagar wrote:
Thanks Matthias,

Well, as things stand, we did have internal discussions on this and it
seemed ok to open it up for IQ and more importantly not ok to have it
opened up for FK-Join. And more importantly, the PR for this is already
merged and some of these things came up during that. Here's the PR link:
https://github.com/apache/kafka/pull/12803.

Thanks!
Sagar.


On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org>
wrote:

Ah. Missed it as it does not have a nice "code block" similar to
`StreamPartitioner` changes.

I understand the motivation, but I am wondering if we might head into a
tricky direction? State stores (at least the built-in ones) and IQ are
kinda build with the idea to have sharded data and that a multi-cast of
keys is an anti-pattern?

Maybe it's fine, but I also don't want to open Pandora's Box. Are we
sure that generalizing the concepts does not cause issues in the
future?

Ie, should we claim that the multi-cast feature should be used for
KStreams only, but not for KTables?

Just want to double check that we are not doing something we regret
later.


-Matthias


On 12/7/22 6:45 PM, Sagar wrote:
Hi Mathias,

I did save it. The changes are added under Public Interfaces (Pt#2
about
enhancing KeyQueryMetadata with partitions method) and
throwing IllegalArgumentException when StreamPartitioner#partitions
method
returns multiple partitions for just FK-join instead of the earlier
decided
FK-Join and IQ.

The background is that for IQ, if the users have multi casted records
to
multiple partitions during ingestion but the fetch returns only a
single
partition, then it would be wrong. That's why the restriction was
lifted
for IQ and that's the reason KeyQueryMetadata now has another
partitions()
method to signify the same.

FK-Join also has a similar case, but while reviewing it was felt that
FK-Join on it's own is fairly complicated and we don't need this
feature
right away so the restriction still exists.

Thanks!
Sagar.


On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org>
wrote:

I don't see any update on the wiki about it. Did you forget to hit
"save"?

Can you also provide some background? I am not sure right now if I
understand the proposed changes?


-Matthias

On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
Thanks Sagar, this makes sense to me -- we clearly need additional
changes
to
avoid breaking IQ when using this feature, but I agree with
continuing
to
restrict
FKJ since they wouldn't stop working without it, and would become
much
harder
to reason about (than they already are) if we did enable them to use
it.

And of course, they can still multicast the final results of a FKJ,
they
just can't
mess with the internal workings of it in this way.

On Tue, Dec 6, 2022 at 9:48 AM Sagar <sagarmeansoc...@gmail.com>
wrote:

Hi All,

I made a couple of edits to the KIP which came up during the code
review.
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called
partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.

Updated KIP:



https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 6:43 PM Sagar <sagarmeansoc...@gmail.com>
wrote:

Thanks Bruno,

Marking this as accepted.

Thanks everyone for their comments/feedback.

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org

wrote:

Hi Sagar,

Thanks for the update and the PR!

+1 (binding)

Best,
Bruno

On 10.09.22 18:57, Sagar wrote:
Hi Bruno,

Thanks, I think these changes make sense to me. I have updated
the
KIP
accordingly.

Thanks!
Sagar.

On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Sagar,

I would not drop the support for dropping records. I would also
not
return null from partitions(). Maybe an Optional can help
here. An
empty
Optional would mean to use the default partitioning behavior of
the
producer. So we would have:

- non-empty Optional, non-empty list of integers: partitions to
send
the
record to
- non-empty Optional, empty list of integers: drop the record
- empty Optional: use default behavior

What do other think?

Best,
Bruno

On 02.09.22 13:53, Sagar wrote:
Hello Bruno/Chris,

Since these are the last set of changes(I am assuming haha),
it
would
be
great if you could review the 2 options from above so that we
can
close
the
voting. Of course I am happy to incorporate any other
requisite
changes.

Thanks!
Sagar.

On Wed, Aug 31, 2022 at 10:07 PM Sagar <
sagarmeansoc...@gmail.com>
wrote:

Thanks Bruno for the great points.

I see 2 options here =>

1) As Chris suggested, drop the support for dropping records
in
the
partitioner. That way, an empty list could signify the usage
of
a
default
partitioner. Also, if the deprecated partition() method
returns
null
thereby signifying the default partitioner, the partitions()
can
return
an
empty list i.e default partitioner.

2) OR we treat a null return type of partitions() method to
signify
the
usage of the default partitioner. In the default
implementation
of
partitions() method, if partition() returns null, then even
partitions()
can return null(instead of an empty list). The
RecordCollectorImpl
code
can
also be modified accordingly. @Chris, to your point, we can
even
drop
the
support of dropping of records. It came up during KIP
discussion,
and I
thought it might be a useful feature. Let me know what you
think.

3) Lastly about the partition number check. I wanted to avoid
the
throwing
of exception so I thought adding it might be a useful
feature.
But
as
you
pointed out, if it can break backwards compatibility, it's
better
to
remove
it.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
<chr...@aiven.io.invalid>
wrote:

+1 to Bruno's concerns about backward compatibility. Do we
actually
need
support for dropping records in the partitioner? It doesn't
seem
necessary
based on the motivation for the KIP. If we remove that
feature,
we
could
handle null and/or empty lists by using the default
partitioning,
equivalent to how we handle null return values from the
existing
partition
method today.

On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
cado...@apache.org

wrote:

Hi Sagar,

Thank you for the updates!

I do not intend to prolong this vote thread more than
needed,
but I
still have some points.

The deprecated partition method can return null if the
default
partitioning logic of the producer should be used.
With the new method partitions() it seems that it is not
possible
to
use
the default partitioning logic, anymore.

Also, in the default implementation of method
partitions(), a
record
that would use the default partitioning logic in method
partition()
would be dropped, which would break backward compatibility
since
Streams
would always call the new method partitions() even though
the
users
still implement the deprecated method partition().

I have a last point that we should probably discuss on the
PR
and
not
on
the KIP but since you added the code in the KIP I need to
mention
it.
I
do not think you should check the validity of the partition
number
since
the ProducerRecord does the same check and throws an
exception.
If
Streams adds the same check but does not throw, the
behavior
is
not
backward compatible.

Best,
Bruno


On 30.08.22 12:43, Sagar wrote:
Thanks Bruno/Chris,

Even I agree that might be better to keep it simple like
the
way
Chris
suggested. I have updated the KIP accordingly. I made
couple
of
minor
changes to the KIP:

1) One of them being the change of return type of
partitions
method
from
List to Set. This is to ensure that in case the
implementation
of
StreamPartitoner is buggy and ends up returning duplicate
partition numbers, we won't have duplicates thereby not
trying
to
send to
the same partition multiple times due to this.
2) I also added a check to send the record only to valid
partition
numbers
and log and drop when the partition number is invalid.
This
is
again
to
prevent errors for cases when the StreamPartitioner
implementation
has
some
bugs (since there are no validations as such).
3) I also updated the Test Plan section based on the
suggestion
from
Bruno.
4) I updated the default implementation of partitions
method
based on
the
great catch from Chris!

Let me know if it looks fine now.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
cado...@apache.org

wrote:

Hi,

I am favour of discarding the sugar for broadcasting and
leave
the
broadcasting to the implementation as Chris suggests. I
think
that
is
the cleanest option.

Best,
Bruno

On 29.08.22 19:50, Chris Egerton wrote:
Hi all,

I think it'd be useful to be more explicit about
broadcasting
to
all
topic
partitions rather than add implicit behavior for empty
cases
(empty
optional, empty list, etc.). The suggested enum approach
would
address
that
nicely.

It's also worth noting that there's no hard requirement
to
add
sugar
for
broadcasting to all topic partitions since the API
already
provides
the
number of topic partitions available when calling a
stream
partitioner.
If
we can't find a clean way to add this support, it might
be
better
to
leave
it out and just let people implement this themselves
with a
line of
Java
8
streams:

            return IntStream.range(0,
numPartitions).boxed().collect(Collectors.toList());

Also worth noting that there may be a bug in the default
implementation
for
the new StreamPartitioner::partitions method, since it
doesn't
appear
to
correctly pick up on null return values from the
partition
method
and
translate them into empty lists.

Cheers,

Chris

On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Sagar,

I do not see an issue with using an empty list together
with
an
empty
Optional. A non-empty Optional that contains an empty
list
would
just
say that there is no partition to which the record
should
be
sent.
If
there is no partition, the record is dropped.

An empty Optional might be a way to say, broadcast to
all
partitions.

Alternatively -- to make it more explicit -- you could
return
an
object
with an enum and a list of partitions. The enum could
have
values
SOME,
ALL, and NONE. If the value is SOME, the list of
partitions
contains
the
partitions to which to send the record. If the value of
the
enum
is
ALL
or NONE, the list of partitions is not used and might
be
even
null
(since in that case the list should not be used and it
would
be a
bug
to
use it).

Best,
Bruno

On 24.08.22 20:11, Sagar wrote:
Thank you Bruno/Matthew for your comments.

I agree using null does seem error prone. However I
think
using a
singleton
list of [-1] might be better in terms of usability, I
am
saying
this
because the KIP also has a provision to return an
empty
list
to
refer
to
dropping the record. So, an empty optional and an
empty
list
have
totally
different meanings which could get confusing.

Let me know what you think.

Thanks!
Sagar.


On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
Detrich
<matthew.dedetr...@aiven.io.invalid> wrote:

I also concur with this, having an Optional in the
type
makes it
very
clear what’s going on and better signifies an
absence of
value
(or
in
this
case the broadcast value).

--
Matthew de Detrich
Aiven Deutschland GmbH
Immanuelkirchstraße 26, 10405 Berlin
Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
m: +491603708037
w: aiven.io e: matthew.dedetr...@aiven.io
On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
wrote:

2.
I would prefer changing the return type of
partitions()
to
Optional<List<Integer>> and using Optional.empty()
as
the
broadcast
value. IMO, The chances that an implementation
returns
null
due
to
a
bug
is much higher than that an implementation returns
an
empty
Optional
due
to a bug.























Reply via email to