Hi All, I made a couple of edits to the KIP which came up during the code review. Changes at a high level are:
1) KeyQueryMetada enhanced to have a new method called partitions(). 2) Lifting the restriction of a single partition for IQ. Now the restriction holds only for FK Join. Updated KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356 Thanks! Sagar. On Mon, Sep 12, 2022 at 6:43 PM Sagar <sagarmeansoc...@gmail.com> wrote: > Thanks Bruno, > > Marking this as accepted. > > Thanks everyone for their comments/feedback. > > Thanks! > Sagar. > > On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org> wrote: > >> Hi Sagar, >> >> Thanks for the update and the PR! >> >> +1 (binding) >> >> Best, >> Bruno >> >> On 10.09.22 18:57, Sagar wrote: >> > Hi Bruno, >> > >> > Thanks, I think these changes make sense to me. I have updated the KIP >> > accordingly. >> > >> > Thanks! >> > Sagar. >> > >> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org> >> wrote: >> > >> >> Hi Sagar, >> >> >> >> I would not drop the support for dropping records. I would also not >> >> return null from partitions(). Maybe an Optional can help here. An >> empty >> >> Optional would mean to use the default partitioning behavior of the >> >> producer. So we would have: >> >> >> >> - non-empty Optional, non-empty list of integers: partitions to send >> the >> >> record to >> >> - non-empty Optional, empty list of integers: drop the record >> >> - empty Optional: use default behavior >> >> >> >> What do other think? >> >> >> >> Best, >> >> Bruno >> >> >> >> On 02.09.22 13:53, Sagar wrote: >> >>> Hello Bruno/Chris, >> >>> >> >>> Since these are the last set of changes(I am assuming haha), it would >> be >> >>> great if you could review the 2 options from above so that we can >> close >> >> the >> >>> voting. Of course I am happy to incorporate any other requisite >> changes. >> >>> >> >>> Thanks! >> >>> Sagar. >> >>> >> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com> >> >> wrote: >> >>> >> >>>> Thanks Bruno for the great points. >> >>>> >> >>>> I see 2 options here => >> >>>> >> >>>> 1) As Chris suggested, drop the support for dropping records in the >> >>>> partitioner. That way, an empty list could signify the usage of a >> >> default >> >>>> partitioner. Also, if the deprecated partition() method returns null >> >>>> thereby signifying the default partitioner, the partitions() can >> return >> >> an >> >>>> empty list i.e default partitioner. >> >>>> >> >>>> 2) OR we treat a null return type of partitions() method to signify >> the >> >>>> usage of the default partitioner. In the default implementation of >> >>>> partitions() method, if partition() returns null, then even >> partitions() >> >>>> can return null(instead of an empty list). The RecordCollectorImpl >> code >> >> can >> >>>> also be modified accordingly. @Chris, to your point, we can even drop >> >> the >> >>>> support of dropping of records. It came up during KIP discussion, >> and I >> >>>> thought it might be a useful feature. Let me know what you think. >> >>>> >> >>>> 3) Lastly about the partition number check. I wanted to avoid the >> >> throwing >> >>>> of exception so I thought adding it might be a useful feature. But as >> >> you >> >>>> pointed out, if it can break backwards compatibility, it's better to >> >> remove >> >>>> it. >> >>>> >> >>>> Thanks! >> >>>> Sagar. >> >>>> >> >>>> >> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton >> <chr...@aiven.io.invalid> >> >>>> wrote: >> >>>> >> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually >> >> need >> >>>>> support for dropping records in the partitioner? It doesn't seem >> >> necessary >> >>>>> based on the motivation for the KIP. If we remove that feature, we >> >> could >> >>>>> handle null and/or empty lists by using the default partitioning, >> >>>>> equivalent to how we handle null return values from the existing >> >> partition >> >>>>> method today. >> >>>>> >> >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cado...@apache.org> >> >> wrote: >> >>>>> >> >>>>>> Hi Sagar, >> >>>>>> >> >>>>>> Thank you for the updates! >> >>>>>> >> >>>>>> I do not intend to prolong this vote thread more than needed, but I >> >>>>>> still have some points. >> >>>>>> >> >>>>>> The deprecated partition method can return null if the default >> >>>>>> partitioning logic of the producer should be used. >> >>>>>> With the new method partitions() it seems that it is not possible >> to >> >> use >> >>>>>> the default partitioning logic, anymore. >> >>>>>> >> >>>>>> Also, in the default implementation of method partitions(), a >> record >> >>>>>> that would use the default partitioning logic in method partition() >> >>>>>> would be dropped, which would break backward compatibility since >> >> Streams >> >>>>>> would always call the new method partitions() even though the users >> >>>>>> still implement the deprecated method partition(). >> >>>>>> >> >>>>>> I have a last point that we should probably discuss on the PR and >> not >> >> on >> >>>>>> the KIP but since you added the code in the KIP I need to mention >> it. >> >> I >> >>>>>> do not think you should check the validity of the partition number >> >> since >> >>>>>> the ProducerRecord does the same check and throws an exception. If >> >>>>>> Streams adds the same check but does not throw, the behavior is not >> >>>>>> backward compatible. >> >>>>>> >> >>>>>> Best, >> >>>>>> Bruno >> >>>>>> >> >>>>>> >> >>>>>> On 30.08.22 12:43, Sagar wrote: >> >>>>>>> Thanks Bruno/Chris, >> >>>>>>> >> >>>>>>> Even I agree that might be better to keep it simple like the way >> >> Chris >> >>>>>>> suggested. I have updated the KIP accordingly. I made couple of >> minor >> >>>>>>> changes to the KIP: >> >>>>>>> >> >>>>>>> 1) One of them being the change of return type of partitions >> method >> >>>>> from >> >>>>>>> List to Set. This is to ensure that in case the implementation of >> >>>>>>> StreamPartitoner is buggy and ends up returning duplicate >> >>>>>>> partition numbers, we won't have duplicates thereby not trying to >> >>>>> send to >> >>>>>>> the same partition multiple times due to this. >> >>>>>>> 2) I also added a check to send the record only to valid partition >> >>>>>> numbers >> >>>>>>> and log and drop when the partition number is invalid. This is >> again >> >>>>> to >> >>>>>>> prevent errors for cases when the StreamPartitioner implementation >> >> has >> >>>>>> some >> >>>>>>> bugs (since there are no validations as such). >> >>>>>>> 3) I also updated the Test Plan section based on the suggestion >> from >> >>>>>> Bruno. >> >>>>>>> 4) I updated the default implementation of partitions method >> based on >> >>>>> the >> >>>>>>> great catch from Chris! >> >>>>>>> >> >>>>>>> Let me know if it looks fine now. >> >>>>>>> >> >>>>>>> Thanks! >> >>>>>>> Sagar. >> >>>>>>> >> >>>>>>> >> >>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cado...@apache.org >> > >> >>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hi, >> >>>>>>>> >> >>>>>>>> I am favour of discarding the sugar for broadcasting and leave >> the >> >>>>>>>> broadcasting to the implementation as Chris suggests. I think >> that >> >> is >> >>>>>>>> the cleanest option. >> >>>>>>>> >> >>>>>>>> Best, >> >>>>>>>> Bruno >> >>>>>>>> >> >>>>>>>> On 29.08.22 19:50, Chris Egerton wrote: >> >>>>>>>>> Hi all, >> >>>>>>>>> >> >>>>>>>>> I think it'd be useful to be more explicit about broadcasting to >> >> all >> >>>>>>>> topic >> >>>>>>>>> partitions rather than add implicit behavior for empty cases >> (empty >> >>>>>>>>> optional, empty list, etc.). The suggested enum approach would >> >>>>> address >> >>>>>>>> that >> >>>>>>>>> nicely. >> >>>>>>>>> >> >>>>>>>>> It's also worth noting that there's no hard requirement to add >> >> sugar >> >>>>>> for >> >>>>>>>>> broadcasting to all topic partitions since the API already >> provides >> >>>>> the >> >>>>>>>>> number of topic partitions available when calling a stream >> >>>>> partitioner. >> >>>>>>>> If >> >>>>>>>>> we can't find a clean way to add this support, it might be >> better >> >> to >> >>>>>>>> leave >> >>>>>>>>> it out and just let people implement this themselves with a >> line of >> >>>>>> Java >> >>>>>>>> 8 >> >>>>>>>>> streams: >> >>>>>>>>> >> >>>>>>>>> return IntStream.range(0, >> >>>>>>>>> numPartitions).boxed().collect(Collectors.toList()); >> >>>>>>>>> >> >>>>>>>>> Also worth noting that there may be a bug in the default >> >>>>> implementation >> >>>>>>>> for >> >>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't >> >>>>> appear >> >>>>>> to >> >>>>>>>>> correctly pick up on null return values from the partition >> method >> >>>>> and >> >>>>>>>>> translate them into empty lists. >> >>>>>>>>> >> >>>>>>>>> Cheers, >> >>>>>>>>> >> >>>>>>>>> Chris >> >>>>>>>>> >> >>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna < >> cado...@apache.org> >> >>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Hi Sagar, >> >>>>>>>>>> >> >>>>>>>>>> I do not see an issue with using an empty list together with an >> >>>>> empty >> >>>>>>>>>> Optional. A non-empty Optional that contains an empty list >> would >> >>>>> just >> >>>>>>>>>> say that there is no partition to which the record should be >> sent. >> >>>>> If >> >>>>>>>>>> there is no partition, the record is dropped. >> >>>>>>>>>> >> >>>>>>>>>> An empty Optional might be a way to say, broadcast to all >> >>>>> partitions. >> >>>>>>>>>> >> >>>>>>>>>> Alternatively -- to make it more explicit -- you could return >> an >> >>>>>> object >> >>>>>>>>>> with an enum and a list of partitions. The enum could have >> values >> >>>>>> SOME, >> >>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions >> >>>>> contains >> >>>>>> the >> >>>>>>>>>> partitions to which to send the record. If the value of the >> enum >> >> is >> >>>>>> ALL >> >>>>>>>>>> or NONE, the list of partitions is not used and might be even >> null >> >>>>>>>>>> (since in that case the list should not be used and it would >> be a >> >>>>> bug >> >>>>>> to >> >>>>>>>>>> use it). >> >>>>>>>>>> >> >>>>>>>>>> Best, >> >>>>>>>>>> Bruno >> >>>>>>>>>> >> >>>>>>>>>> On 24.08.22 20:11, Sagar wrote: >> >>>>>>>>>>> Thank you Bruno/Matthew for your comments. >> >>>>>>>>>>> >> >>>>>>>>>>> I agree using null does seem error prone. However I think >> using a >> >>>>>>>>>> singleton >> >>>>>>>>>>> list of [-1] might be better in terms of usability, I am >> saying >> >>>>> this >> >>>>>>>>>>> because the KIP also has a provision to return an empty list >> to >> >>>>> refer >> >>>>>>>> to >> >>>>>>>>>>> dropping the record. So, an empty optional and an empty list >> have >> >>>>>>>> totally >> >>>>>>>>>>> different meanings which could get confusing. >> >>>>>>>>>>> >> >>>>>>>>>>> Let me know what you think. >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks! >> >>>>>>>>>>> Sagar. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich >> >>>>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> I also concur with this, having an Optional in the type >> makes it >> >>>>>> very >> >>>>>>>>>>>> clear what’s going on and better signifies an absence of >> value >> >>>>> (or >> >>>>>> in >> >>>>>>>>>> this >> >>>>>>>>>>>> case the broadcast value). >> >>>>>>>>>>>> >> >>>>>>>>>>>> -- >> >>>>>>>>>>>> Matthew de Detrich >> >>>>>>>>>>>> Aiven Deutschland GmbH >> >>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin >> >>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B >> >>>>>>>>>>>> >> >>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen >> >>>>>>>>>>>> m: +491603708037 >> >>>>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io >> >>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> 2. >> >>>>>>>>>>>>> I would prefer changing the return type of partitions() to >> >>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the >> >>>>> broadcast >> >>>>>>>>>>>>> value. IMO, The chances that an implementation returns null >> due >> >>>>> to >> >>>>>> a >> >>>>>>>>>> bug >> >>>>>>>>>>>>> is much higher than that an implementation returns an empty >> >>>>>> Optional >> >>>>>>>>>> due >> >>>>>>>>>>>>> to a bug. >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>> >> >> >> > >> >