Hi Sagar,

I would not drop the support for dropping records. I would also not return null from partitions(). Maybe an Optional can help here. An empty Optional would mean to use the default partitioning behavior of the producer. So we would have:

- non-empty Optional, non-empty list of integers: partitions to send the record to
- non-empty Optional, empty list of integers: drop the record
- empty Optional: use default behavior

What do other think?

Best,
Bruno

On 02.09.22 13:53, Sagar wrote:
Hello Bruno/Chris,

Since these are the last set of changes(I am assuming haha), it would be
great if you could review the 2 options from above so that we can close the
voting. Of course I am happy to incorporate any other requisite changes.

Thanks!
Sagar.

On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com> wrote:

Thanks Bruno for the great points.

I see 2 options here =>

1) As Chris suggested, drop the support for dropping records in the
partitioner. That way, an empty list could signify the usage of a default
partitioner. Also, if the deprecated partition() method returns null
thereby signifying the default partitioner, the partitions() can return an
empty list i.e default partitioner.

2) OR we treat a null return type of partitions() method to signify the
usage of the default partitioner. In the default implementation of
partitions() method, if partition() returns null, then even partitions()
can return null(instead of an empty list). The RecordCollectorImpl code can
also be modified accordingly. @Chris, to your point, we can even drop the
support of dropping of records. It came up during KIP discussion, and I
thought it might be a useful feature. Let me know what you think.

3) Lastly about the partition number check. I wanted to avoid the throwing
of exception so I thought adding it might be a useful feature. But as you
pointed out, if it can break backwards compatibility, it's better to remove
it.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <chr...@aiven.io.invalid>
wrote:

+1 to Bruno's concerns about backward compatibility. Do we actually need
support for dropping records in the partitioner? It doesn't seem necessary
based on the motivation for the KIP. If we remove that feature, we could
handle null and/or empty lists by using the default partitioning,
equivalent to how we handle null return values from the existing partition
method today.

On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cado...@apache.org> wrote:

Hi Sagar,

Thank you for the updates!

I do not intend to prolong this vote thread more than needed, but I
still have some points.

The deprecated partition method can return null if the default
partitioning logic of the producer should be used.
With the new method partitions() it seems that it is not possible to use
the default partitioning logic, anymore.

Also, in the default implementation of method partitions(), a record
that would use the default partitioning logic in method partition()
would be dropped, which would break backward compatibility since Streams
would always call the new method partitions() even though the users
still implement the deprecated method partition().

I have a last point that we should probably discuss on the PR and not on
the KIP but since you added the code in the KIP I need to mention it. I
do not think you should check the validity of the partition number since
the ProducerRecord does the same check and throws an exception. If
Streams adds the same check but does not throw, the behavior is not
backward compatible.

Best,
Bruno


On 30.08.22 12:43, Sagar wrote:
Thanks Bruno/Chris,

Even I agree that might be better to keep it simple like the way Chris
suggested. I have updated the KIP accordingly. I made couple of minor
changes to the KIP:

1) One of them being the change of return type of partitions method
from
List to Set. This is to ensure that in case the implementation of
StreamPartitoner is buggy and ends up returning duplicate
partition numbers, we won't have duplicates thereby not trying to
send to
the same partition multiple times due to this.
2) I also added a check to send the record only to valid partition
numbers
and log and drop when the partition number is invalid. This is again
to
prevent errors for cases when the StreamPartitioner implementation has
some
bugs (since there are no validations as such).
3) I also updated the Test Plan section based on the suggestion from
Bruno.
4) I updated the default implementation of partitions method based on
the
great catch from Chris!

Let me know if it looks fine now.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cado...@apache.org>
wrote:

Hi,

I am favour of discarding the sugar for broadcasting and leave the
broadcasting to the implementation as Chris suggests. I think that is
the cleanest option.

Best,
Bruno

On 29.08.22 19:50, Chris Egerton wrote:
Hi all,

I think it'd be useful to be more explicit about broadcasting to all
topic
partitions rather than add implicit behavior for empty cases (empty
optional, empty list, etc.). The suggested enum approach would
address
that
nicely.

It's also worth noting that there's no hard requirement to add sugar
for
broadcasting to all topic partitions since the API already provides
the
number of topic partitions available when calling a stream
partitioner.
If
we can't find a clean way to add this support, it might be better to
leave
it out and just let people implement this themselves with a line of
Java
8
streams:

       return IntStream.range(0,
numPartitions).boxed().collect(Collectors.toList());

Also worth noting that there may be a bug in the default
implementation
for
the new StreamPartitioner::partitions method, since it doesn't
appear
to
correctly pick up on null return values from the partition method
and
translate them into empty lists.

Cheers,

Chris

On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi Sagar,

I do not see an issue with using an empty list together with an
empty
Optional. A non-empty Optional that contains an empty list would
just
say that there is no partition to which the record should be sent.
If
there is no partition, the record is dropped.

An empty Optional might be a way to say, broadcast to all
partitions.

Alternatively -- to make it more explicit -- you could return an
object
with an enum and a list of partitions. The enum could have values
SOME,
ALL, and NONE. If the value is SOME, the list of partitions
contains
the
partitions to which to send the record. If the value of the enum is
ALL
or NONE, the list of partitions is not used and might be even null
(since in that case the list should not be used and it would be a
bug
to
use it).

Best,
Bruno

On 24.08.22 20:11, Sagar wrote:
Thank you Bruno/Matthew for your comments.

I agree using null does seem error prone. However I think using a
singleton
list of [-1] might be better in terms of usability, I am saying
this
because the KIP also has a provision to return an empty list to
refer
to
dropping the record. So, an empty optional and an empty list have
totally
different meanings which could get confusing.

Let me know what you think.

Thanks!
Sagar.


On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
<matthew.dedetr...@aiven.io.invalid> wrote:

I also concur with this, having an Optional in the type makes it
very
clear what’s going on and better signifies an absence of value
(or
in
this
case the broadcast value).

--
Matthew de Detrich
Aiven Deutschland GmbH
Immanuelkirchstraße 26, 10405 Berlin
Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
m: +491603708037
w: aiven.io e: matthew.dedetr...@aiven.io
On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:

2.
I would prefer changing the return type of partitions() to
Optional<List<Integer>> and using Optional.empty() as the
broadcast
value. IMO, The chances that an implementation returns null due
to
a
bug
is much higher than that an implementation returns an empty
Optional
due
to a bug.










Reply via email to