Hi Mathias, I did save it. The changes are added under Public Interfaces (Pt#2 about enhancing KeyQueryMetadata with partitions method) and throwing IllegalArgumentException when StreamPartitioner#partitions method returns multiple partitions for just FK-join instead of the earlier decided FK-Join and IQ.
The background is that for IQ, if the users have multi casted records to multiple partitions during ingestion but the fetch returns only a single partition, then it would be wrong. That's why the restriction was lifted for IQ and that's the reason KeyQueryMetadata now has another partitions() method to signify the same. FK-Join also has a similar case, but while reviewing it was felt that FK-Join on it's own is fairly complicated and we don't need this feature right away so the restriction still exists. Thanks! Sagar. On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote: > I don't see any update on the wiki about it. Did you forget to hit "save"? > > Can you also provide some background? I am not sure right now if I > understand the proposed changes? > > > -Matthias > > On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote: > > Thanks Sagar, this makes sense to me -- we clearly need additional > changes > > to > > avoid breaking IQ when using this feature, but I agree with continuing to > > restrict > > FKJ since they wouldn't stop working without it, and would become much > > harder > > to reason about (than they already are) if we did enable them to use it. > > > > And of course, they can still multicast the final results of a FKJ, they > > just can't > > mess with the internal workings of it in this way. > > > > On Tue, Dec 6, 2022 at 9:48 AM Sagar <sagarmeansoc...@gmail.com> wrote: > > > >> Hi All, > >> > >> I made a couple of edits to the KIP which came up during the code > review. > >> Changes at a high level are: > >> > >> 1) KeyQueryMetada enhanced to have a new method called partitions(). > >> 2) Lifting the restriction of a single partition for IQ. Now the > >> restriction holds only for FK Join. > >> > >> Updated KIP: > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356 > >> > >> Thanks! > >> Sagar. > >> > >> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sagarmeansoc...@gmail.com> > wrote: > >> > >>> Thanks Bruno, > >>> > >>> Marking this as accepted. > >>> > >>> Thanks everyone for their comments/feedback. > >>> > >>> Thanks! > >>> Sagar. > >>> > >>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org> > >> wrote: > >>> > >>>> Hi Sagar, > >>>> > >>>> Thanks for the update and the PR! > >>>> > >>>> +1 (binding) > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 10.09.22 18:57, Sagar wrote: > >>>>> Hi Bruno, > >>>>> > >>>>> Thanks, I think these changes make sense to me. I have updated the > KIP > >>>>> accordingly. > >>>>> > >>>>> Thanks! > >>>>> Sagar. > >>>>> > >>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org> > >>>> wrote: > >>>>> > >>>>>> Hi Sagar, > >>>>>> > >>>>>> I would not drop the support for dropping records. I would also not > >>>>>> return null from partitions(). Maybe an Optional can help here. An > >>>> empty > >>>>>> Optional would mean to use the default partitioning behavior of the > >>>>>> producer. So we would have: > >>>>>> > >>>>>> - non-empty Optional, non-empty list of integers: partitions to send > >>>> the > >>>>>> record to > >>>>>> - non-empty Optional, empty list of integers: drop the record > >>>>>> - empty Optional: use default behavior > >>>>>> > >>>>>> What do other think? > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> On 02.09.22 13:53, Sagar wrote: > >>>>>>> Hello Bruno/Chris, > >>>>>>> > >>>>>>> Since these are the last set of changes(I am assuming haha), it > >> would > >>>> be > >>>>>>> great if you could review the 2 options from above so that we can > >>>> close > >>>>>> the > >>>>>>> voting. Of course I am happy to incorporate any other requisite > >>>> changes. > >>>>>>> > >>>>>>> Thanks! > >>>>>>> Sagar. > >>>>>>> > >>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Thanks Bruno for the great points. > >>>>>>>> > >>>>>>>> I see 2 options here => > >>>>>>>> > >>>>>>>> 1) As Chris suggested, drop the support for dropping records in > the > >>>>>>>> partitioner. That way, an empty list could signify the usage of a > >>>>>> default > >>>>>>>> partitioner. Also, if the deprecated partition() method returns > >> null > >>>>>>>> thereby signifying the default partitioner, the partitions() can > >>>> return > >>>>>> an > >>>>>>>> empty list i.e default partitioner. > >>>>>>>> > >>>>>>>> 2) OR we treat a null return type of partitions() method to > signify > >>>> the > >>>>>>>> usage of the default partitioner. In the default implementation of > >>>>>>>> partitions() method, if partition() returns null, then even > >>>> partitions() > >>>>>>>> can return null(instead of an empty list). The RecordCollectorImpl > >>>> code > >>>>>> can > >>>>>>>> also be modified accordingly. @Chris, to your point, we can even > >> drop > >>>>>> the > >>>>>>>> support of dropping of records. It came up during KIP discussion, > >>>> and I > >>>>>>>> thought it might be a useful feature. Let me know what you think. > >>>>>>>> > >>>>>>>> 3) Lastly about the partition number check. I wanted to avoid the > >>>>>> throwing > >>>>>>>> of exception so I thought adding it might be a useful feature. But > >> as > >>>>>> you > >>>>>>>> pointed out, if it can break backwards compatibility, it's better > >> to > >>>>>> remove > >>>>>>>> it. > >>>>>>>> > >>>>>>>> Thanks! > >>>>>>>> Sagar. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton > >>>> <chr...@aiven.io.invalid> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we > >> actually > >>>>>> need > >>>>>>>>> support for dropping records in the partitioner? It doesn't seem > >>>>>> necessary > >>>>>>>>> based on the motivation for the KIP. If we remove that feature, > we > >>>>>> could > >>>>>>>>> handle null and/or empty lists by using the default partitioning, > >>>>>>>>> equivalent to how we handle null return values from the existing > >>>>>> partition > >>>>>>>>> method today. > >>>>>>>>> > >>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna < > cado...@apache.org > >>> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Sagar, > >>>>>>>>>> > >>>>>>>>>> Thank you for the updates! > >>>>>>>>>> > >>>>>>>>>> I do not intend to prolong this vote thread more than needed, > >> but I > >>>>>>>>>> still have some points. > >>>>>>>>>> > >>>>>>>>>> The deprecated partition method can return null if the default > >>>>>>>>>> partitioning logic of the producer should be used. > >>>>>>>>>> With the new method partitions() it seems that it is not > possible > >>>> to > >>>>>> use > >>>>>>>>>> the default partitioning logic, anymore. > >>>>>>>>>> > >>>>>>>>>> Also, in the default implementation of method partitions(), a > >>>> record > >>>>>>>>>> that would use the default partitioning logic in method > >> partition() > >>>>>>>>>> would be dropped, which would break backward compatibility since > >>>>>> Streams > >>>>>>>>>> would always call the new method partitions() even though the > >> users > >>>>>>>>>> still implement the deprecated method partition(). > >>>>>>>>>> > >>>>>>>>>> I have a last point that we should probably discuss on the PR > and > >>>> not > >>>>>> on > >>>>>>>>>> the KIP but since you added the code in the KIP I need to > mention > >>>> it. > >>>>>> I > >>>>>>>>>> do not think you should check the validity of the partition > >> number > >>>>>> since > >>>>>>>>>> the ProducerRecord does the same check and throws an exception. > >> If > >>>>>>>>>> Streams adds the same check but does not throw, the behavior is > >> not > >>>>>>>>>> backward compatible. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Bruno > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 30.08.22 12:43, Sagar wrote: > >>>>>>>>>>> Thanks Bruno/Chris, > >>>>>>>>>>> > >>>>>>>>>>> Even I agree that might be better to keep it simple like the > way > >>>>>> Chris > >>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple of > >>>> minor > >>>>>>>>>>> changes to the KIP: > >>>>>>>>>>> > >>>>>>>>>>> 1) One of them being the change of return type of partitions > >>>> method > >>>>>>>>> from > >>>>>>>>>>> List to Set. This is to ensure that in case the implementation > >> of > >>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate > >>>>>>>>>>> partition numbers, we won't have duplicates thereby not trying > >> to > >>>>>>>>> send to > >>>>>>>>>>> the same partition multiple times due to this. > >>>>>>>>>>> 2) I also added a check to send the record only to valid > >> partition > >>>>>>>>>> numbers > >>>>>>>>>>> and log and drop when the partition number is invalid. This is > >>>> again > >>>>>>>>> to > >>>>>>>>>>> prevent errors for cases when the StreamPartitioner > >> implementation > >>>>>> has > >>>>>>>>>> some > >>>>>>>>>>> bugs (since there are no validations as such). > >>>>>>>>>>> 3) I also updated the Test Plan section based on the suggestion > >>>> from > >>>>>>>>>> Bruno. > >>>>>>>>>>> 4) I updated the default implementation of partitions method > >>>> based on > >>>>>>>>> the > >>>>>>>>>>> great catch from Chris! > >>>>>>>>>>> > >>>>>>>>>>> Let me know if it looks fine now. > >>>>>>>>>>> > >>>>>>>>>>> Thanks! > >>>>>>>>>>> Sagar. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna < > >> cado...@apache.org > >>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi, > >>>>>>>>>>>> > >>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and leave > >>>> the > >>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I think > >>>> that > >>>>>> is > >>>>>>>>>>>> the cleanest option. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Bruno > >>>>>>>>>>>> > >>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote: > >>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think it'd be useful to be more explicit about broadcasting > >> to > >>>>>> all > >>>>>>>>>>>> topic > >>>>>>>>>>>>> partitions rather than add implicit behavior for empty cases > >>>> (empty > >>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach > would > >>>>>>>>> address > >>>>>>>>>>>> that > >>>>>>>>>>>>> nicely. > >>>>>>>>>>>>> > >>>>>>>>>>>>> It's also worth noting that there's no hard requirement to > add > >>>>>> sugar > >>>>>>>>>> for > >>>>>>>>>>>>> broadcasting to all topic partitions since the API already > >>>> provides > >>>>>>>>> the > >>>>>>>>>>>>> number of topic partitions available when calling a stream > >>>>>>>>> partitioner. > >>>>>>>>>>>> If > >>>>>>>>>>>>> we can't find a clean way to add this support, it might be > >>>> better > >>>>>> to > >>>>>>>>>>>> leave > >>>>>>>>>>>>> it out and just let people implement this themselves with a > >>>> line of > >>>>>>>>>> Java > >>>>>>>>>>>> 8 > >>>>>>>>>>>>> streams: > >>>>>>>>>>>>> > >>>>>>>>>>>>> return IntStream.range(0, > >>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList()); > >>>>>>>>>>>>> > >>>>>>>>>>>>> Also worth noting that there may be a bug in the default > >>>>>>>>> implementation > >>>>>>>>>>>> for > >>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it > doesn't > >>>>>>>>> appear > >>>>>>>>>> to > >>>>>>>>>>>>> correctly pick up on null return values from the partition > >>>> method > >>>>>>>>> and > >>>>>>>>>>>>> translate them into empty lists. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Chris > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna < > >>>> cado...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Sagar, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I do not see an issue with using an empty list together with > >> an > >>>>>>>>> empty > >>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list > >>>> would > >>>>>>>>> just > >>>>>>>>>>>>>> say that there is no partition to which the record should be > >>>> sent. > >>>>>>>>> If > >>>>>>>>>>>>>> there is no partition, the record is dropped. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all > >>>>>>>>> partitions. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could > return > >>>> an > >>>>>>>>>> object > >>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have > >>>> values > >>>>>>>>>> SOME, > >>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions > >>>>>>>>> contains > >>>>>>>>>> the > >>>>>>>>>>>>>> partitions to which to send the record. If the value of the > >>>> enum > >>>>>> is > >>>>>>>>>> ALL > >>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be > even > >>>> null > >>>>>>>>>>>>>> (since in that case the list should not be used and it would > >>>> be a > >>>>>>>>> bug > >>>>>>>>>> to > >>>>>>>>>>>>>> use it). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote: > >>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I agree using null does seem error prone. However I think > >>>> using a > >>>>>>>>>>>>>> singleton > >>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am > >>>> saying > >>>>>>>>> this > >>>>>>>>>>>>>>> because the KIP also has a provision to return an empty > list > >>>> to > >>>>>>>>> refer > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty > list > >>>> have > >>>>>>>>>>>> totally > >>>>>>>>>>>>>>> different meanings which could get confusing. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Let me know what you think. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>> Sagar. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich > >>>>>>>>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type > >>>> makes it > >>>>>>>>>> very > >>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of > >>>> value > >>>>>>>>> (or > >>>>>>>>>> in > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>> case the broadcast value). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> Matthew de Detrich > >>>>>>>>>>>>>>>> Aiven Deutschland GmbH > >>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin > >>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > >>>>>>>>>>>>>>>> m: +491603708037 > >>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io > >>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, > wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions() > to > >>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the > >>>>>>>>> broadcast > >>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns > >> null > >>>> due > >>>>>>>>> to > >>>>>>>>>> a > >>>>>>>>>>>>>> bug > >>>>>>>>>>>>>>>>> is much higher than that an implementation returns an > >> empty > >>>>>>>>>> Optional > >>>>>>>>>>>>>> due > >>>>>>>>>>>>>>>>> to a bug. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >