Hi All,

I made a couple of edits to the KIP which came up during the code review.
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.

Updated KIP:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 6:43 PM Sagar <sagarmeansoc...@gmail.com> wrote:

> Thanks Bruno,
>
> Marking this as accepted.
>
> Thanks everyone for their comments/feedback.
>
> Thanks!
> Sagar.
>
> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org> wrote:
>
>> Hi Sagar,
>>
>> Thanks for the update and the PR!
>>
>> +1 (binding)
>>
>> Best,
>> Bruno
>>
>> On 10.09.22 18:57, Sagar wrote:
>> > Hi Bruno,
>> >
>> > Thanks, I think these changes make sense to me. I have updated the KIP
>> > accordingly.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org>
>> wrote:
>> >
>> >> Hi Sagar,
>> >>
>> >> I would not drop the support for dropping records. I would also not
>> >> return null from partitions(). Maybe an Optional can help here. An
>> empty
>> >> Optional would mean to use the default partitioning behavior of the
>> >> producer. So we would have:
>> >>
>> >> - non-empty Optional, non-empty list of integers: partitions to send
>> the
>> >> record to
>> >> - non-empty Optional, empty list of integers: drop the record
>> >> - empty Optional: use default behavior
>> >>
>> >> What do other think?
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >> On 02.09.22 13:53, Sagar wrote:
>> >>> Hello Bruno/Chris,
>> >>>
>> >>> Since these are the last set of changes(I am assuming haha), it would
>> be
>> >>> great if you could review the 2 options from above so that we can
>> close
>> >> the
>> >>> voting. Of course I am happy to incorporate any other requisite
>> changes.
>> >>>
>> >>> Thanks!
>> >>> Sagar.
>> >>>
>> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> Thanks Bruno for the great points.
>> >>>>
>> >>>> I see 2 options here =>
>> >>>>
>> >>>> 1) As Chris suggested, drop the support for dropping records in the
>> >>>> partitioner. That way, an empty list could signify the usage of a
>> >> default
>> >>>> partitioner. Also, if the deprecated partition() method returns null
>> >>>> thereby signifying the default partitioner, the partitions() can
>> return
>> >> an
>> >>>> empty list i.e default partitioner.
>> >>>>
>> >>>> 2) OR we treat a null return type of partitions() method to signify
>> the
>> >>>> usage of the default partitioner. In the default implementation of
>> >>>> partitions() method, if partition() returns null, then even
>> partitions()
>> >>>> can return null(instead of an empty list). The RecordCollectorImpl
>> code
>> >> can
>> >>>> also be modified accordingly. @Chris, to your point, we can even drop
>> >> the
>> >>>> support of dropping of records. It came up during KIP discussion,
>> and I
>> >>>> thought it might be a useful feature. Let me know what you think.
>> >>>>
>> >>>> 3) Lastly about the partition number check. I wanted to avoid the
>> >> throwing
>> >>>> of exception so I thought adding it might be a useful feature. But as
>> >> you
>> >>>> pointed out, if it can break backwards compatibility, it's better to
>> >> remove
>> >>>> it.
>> >>>>
>> >>>> Thanks!
>> >>>> Sagar.
>> >>>>
>> >>>>
>> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>> <chr...@aiven.io.invalid>
>> >>>> wrote:
>> >>>>
>> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
>> >> need
>> >>>>> support for dropping records in the partitioner? It doesn't seem
>> >> necessary
>> >>>>> based on the motivation for the KIP. If we remove that feature, we
>> >> could
>> >>>>> handle null and/or empty lists by using the default partitioning,
>> >>>>> equivalent to how we handle null return values from the existing
>> >> partition
>> >>>>> method today.
>> >>>>>
>> >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cado...@apache.org>
>> >> wrote:
>> >>>>>
>> >>>>>> Hi Sagar,
>> >>>>>>
>> >>>>>> Thank you for the updates!
>> >>>>>>
>> >>>>>> I do not intend to prolong this vote thread more than needed, but I
>> >>>>>> still have some points.
>> >>>>>>
>> >>>>>> The deprecated partition method can return null if the default
>> >>>>>> partitioning logic of the producer should be used.
>> >>>>>> With the new method partitions() it seems that it is not possible
>> to
>> >> use
>> >>>>>> the default partitioning logic, anymore.
>> >>>>>>
>> >>>>>> Also, in the default implementation of method partitions(), a
>> record
>> >>>>>> that would use the default partitioning logic in method partition()
>> >>>>>> would be dropped, which would break backward compatibility since
>> >> Streams
>> >>>>>> would always call the new method partitions() even though the users
>> >>>>>> still implement the deprecated method partition().
>> >>>>>>
>> >>>>>> I have a last point that we should probably discuss on the PR and
>> not
>> >> on
>> >>>>>> the KIP but since you added the code in the KIP I need to mention
>> it.
>> >> I
>> >>>>>> do not think you should check the validity of the partition number
>> >> since
>> >>>>>> the ProducerRecord does the same check and throws an exception. If
>> >>>>>> Streams adds the same check but does not throw, the behavior is not
>> >>>>>> backward compatible.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Bruno
>> >>>>>>
>> >>>>>>
>> >>>>>> On 30.08.22 12:43, Sagar wrote:
>> >>>>>>> Thanks Bruno/Chris,
>> >>>>>>>
>> >>>>>>> Even I agree that might be better to keep it simple like the way
>> >> Chris
>> >>>>>>> suggested. I have updated the KIP accordingly. I made couple of
>> minor
>> >>>>>>> changes to the KIP:
>> >>>>>>>
>> >>>>>>> 1) One of them being the change of return type of partitions
>> method
>> >>>>> from
>> >>>>>>> List to Set. This is to ensure that in case the implementation of
>> >>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>> >>>>>>> partition numbers, we won't have duplicates thereby not trying to
>> >>>>> send to
>> >>>>>>> the same partition multiple times due to this.
>> >>>>>>> 2) I also added a check to send the record only to valid partition
>> >>>>>> numbers
>> >>>>>>> and log and drop when the partition number is invalid. This is
>> again
>> >>>>> to
>> >>>>>>> prevent errors for cases when the StreamPartitioner implementation
>> >> has
>> >>>>>> some
>> >>>>>>> bugs (since there are no validations as such).
>> >>>>>>> 3) I also updated the Test Plan section based on the suggestion
>> from
>> >>>>>> Bruno.
>> >>>>>>> 4) I updated the default implementation of partitions method
>> based on
>> >>>>> the
>> >>>>>>> great catch from Chris!
>> >>>>>>>
>> >>>>>>> Let me know if it looks fine now.
>> >>>>>>>
>> >>>>>>> Thanks!
>> >>>>>>> Sagar.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cado...@apache.org
>> >
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> I am favour of discarding the sugar for broadcasting and leave
>> the
>> >>>>>>>> broadcasting to the implementation as Chris suggests. I think
>> that
>> >> is
>> >>>>>>>> the cleanest option.
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Bruno
>> >>>>>>>>
>> >>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>> >>>>>>>>> Hi all,
>> >>>>>>>>>
>> >>>>>>>>> I think it'd be useful to be more explicit about broadcasting to
>> >> all
>> >>>>>>>> topic
>> >>>>>>>>> partitions rather than add implicit behavior for empty cases
>> (empty
>> >>>>>>>>> optional, empty list, etc.). The suggested enum approach would
>> >>>>> address
>> >>>>>>>> that
>> >>>>>>>>> nicely.
>> >>>>>>>>>
>> >>>>>>>>> It's also worth noting that there's no hard requirement to add
>> >> sugar
>> >>>>>> for
>> >>>>>>>>> broadcasting to all topic partitions since the API already
>> provides
>> >>>>> the
>> >>>>>>>>> number of topic partitions available when calling a stream
>> >>>>> partitioner.
>> >>>>>>>> If
>> >>>>>>>>> we can't find a clean way to add this support, it might be
>> better
>> >> to
>> >>>>>>>> leave
>> >>>>>>>>> it out and just let people implement this themselves with a
>> line of
>> >>>>>> Java
>> >>>>>>>> 8
>> >>>>>>>>> streams:
>> >>>>>>>>>
>> >>>>>>>>>         return IntStream.range(0,
>> >>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>> >>>>>>>>>
>> >>>>>>>>> Also worth noting that there may be a bug in the default
>> >>>>> implementation
>> >>>>>>>> for
>> >>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
>> >>>>> appear
>> >>>>>> to
>> >>>>>>>>> correctly pick up on null return values from the partition
>> method
>> >>>>> and
>> >>>>>>>>> translate them into empty lists.
>> >>>>>>>>>
>> >>>>>>>>> Cheers,
>> >>>>>>>>>
>> >>>>>>>>> Chris
>> >>>>>>>>>
>> >>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>> cado...@apache.org>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Sagar,
>> >>>>>>>>>>
>> >>>>>>>>>> I do not see an issue with using an empty list together with an
>> >>>>> empty
>> >>>>>>>>>> Optional. A non-empty Optional that contains an empty list
>> would
>> >>>>> just
>> >>>>>>>>>> say that there is no partition to which the record should be
>> sent.
>> >>>>> If
>> >>>>>>>>>> there is no partition, the record is dropped.
>> >>>>>>>>>>
>> >>>>>>>>>> An empty Optional might be a way to say, broadcast to all
>> >>>>> partitions.
>> >>>>>>>>>>
>> >>>>>>>>>> Alternatively -- to make it more explicit -- you could return
>> an
>> >>>>>> object
>> >>>>>>>>>> with an enum and a list of partitions. The enum could have
>> values
>> >>>>>> SOME,
>> >>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
>> >>>>> contains
>> >>>>>> the
>> >>>>>>>>>> partitions to which to send the record. If the value of the
>> enum
>> >> is
>> >>>>>> ALL
>> >>>>>>>>>> or NONE, the list of partitions is not used and might be even
>> null
>> >>>>>>>>>> (since in that case the list should not be used and it would
>> be a
>> >>>>> bug
>> >>>>>> to
>> >>>>>>>>>> use it).
>> >>>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Bruno
>> >>>>>>>>>>
>> >>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>> >>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I agree using null does seem error prone. However I think
>> using a
>> >>>>>>>>>> singleton
>> >>>>>>>>>>> list of [-1] might be better in terms of usability, I am
>> saying
>> >>>>> this
>> >>>>>>>>>>> because the KIP also has a provision to return an empty list
>> to
>> >>>>> refer
>> >>>>>>>> to
>> >>>>>>>>>>> dropping the record. So, an empty optional and an empty list
>> have
>> >>>>>>>> totally
>> >>>>>>>>>>> different meanings which could get confusing.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Let me know what you think.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks!
>> >>>>>>>>>>> Sagar.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>> >>>>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> I also concur with this, having an Optional in the type
>> makes it
>> >>>>>> very
>> >>>>>>>>>>>> clear what’s going on and better signifies an absence of
>> value
>> >>>>> (or
>> >>>>>> in
>> >>>>>>>>>> this
>> >>>>>>>>>>>> case the broadcast value).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> --
>> >>>>>>>>>>>> Matthew de Detrich
>> >>>>>>>>>>>> Aiven Deutschland GmbH
>> >>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>> >>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> >>>>>>>>>>>> m: +491603708037
>> >>>>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io
>> >>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> 2.
>> >>>>>>>>>>>>> I would prefer changing the return type of partitions() to
>> >>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>> >>>>> broadcast
>> >>>>>>>>>>>>> value. IMO, The chances that an implementation returns null
>> due
>> >>>>> to
>> >>>>>> a
>> >>>>>>>>>> bug
>> >>>>>>>>>>>>> is much higher than that an implementation returns an empty
>> >>>>>> Optional
>> >>>>>>>>>> due
>> >>>>>>>>>>>>> to a bug.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Reply via email to