Hi Bruno, Thanks, I think these changes make sense to me. I have updated the KIP accordingly.
Thanks! Sagar. On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org> wrote: > Hi Sagar, > > I would not drop the support for dropping records. I would also not > return null from partitions(). Maybe an Optional can help here. An empty > Optional would mean to use the default partitioning behavior of the > producer. So we would have: > > - non-empty Optional, non-empty list of integers: partitions to send the > record to > - non-empty Optional, empty list of integers: drop the record > - empty Optional: use default behavior > > What do other think? > > Best, > Bruno > > On 02.09.22 13:53, Sagar wrote: > > Hello Bruno/Chris, > > > > Since these are the last set of changes(I am assuming haha), it would be > > great if you could review the 2 options from above so that we can close > the > > voting. Of course I am happy to incorporate any other requisite changes. > > > > Thanks! > > Sagar. > > > > On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com> > wrote: > > > >> Thanks Bruno for the great points. > >> > >> I see 2 options here => > >> > >> 1) As Chris suggested, drop the support for dropping records in the > >> partitioner. That way, an empty list could signify the usage of a > default > >> partitioner. Also, if the deprecated partition() method returns null > >> thereby signifying the default partitioner, the partitions() can return > an > >> empty list i.e default partitioner. > >> > >> 2) OR we treat a null return type of partitions() method to signify the > >> usage of the default partitioner. In the default implementation of > >> partitions() method, if partition() returns null, then even partitions() > >> can return null(instead of an empty list). The RecordCollectorImpl code > can > >> also be modified accordingly. @Chris, to your point, we can even drop > the > >> support of dropping of records. It came up during KIP discussion, and I > >> thought it might be a useful feature. Let me know what you think. > >> > >> 3) Lastly about the partition number check. I wanted to avoid the > throwing > >> of exception so I thought adding it might be a useful feature. But as > you > >> pointed out, if it can break backwards compatibility, it's better to > remove > >> it. > >> > >> Thanks! > >> Sagar. > >> > >> > >> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <chr...@aiven.io.invalid> > >> wrote: > >> > >>> +1 to Bruno's concerns about backward compatibility. Do we actually > need > >>> support for dropping records in the partitioner? It doesn't seem > necessary > >>> based on the motivation for the KIP. If we remove that feature, we > could > >>> handle null and/or empty lists by using the default partitioning, > >>> equivalent to how we handle null return values from the existing > partition > >>> method today. > >>> > >>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cado...@apache.org> > wrote: > >>> > >>>> Hi Sagar, > >>>> > >>>> Thank you for the updates! > >>>> > >>>> I do not intend to prolong this vote thread more than needed, but I > >>>> still have some points. > >>>> > >>>> The deprecated partition method can return null if the default > >>>> partitioning logic of the producer should be used. > >>>> With the new method partitions() it seems that it is not possible to > use > >>>> the default partitioning logic, anymore. > >>>> > >>>> Also, in the default implementation of method partitions(), a record > >>>> that would use the default partitioning logic in method partition() > >>>> would be dropped, which would break backward compatibility since > Streams > >>>> would always call the new method partitions() even though the users > >>>> still implement the deprecated method partition(). > >>>> > >>>> I have a last point that we should probably discuss on the PR and not > on > >>>> the KIP but since you added the code in the KIP I need to mention it. > I > >>>> do not think you should check the validity of the partition number > since > >>>> the ProducerRecord does the same check and throws an exception. If > >>>> Streams adds the same check but does not throw, the behavior is not > >>>> backward compatible. > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> > >>>> On 30.08.22 12:43, Sagar wrote: > >>>>> Thanks Bruno/Chris, > >>>>> > >>>>> Even I agree that might be better to keep it simple like the way > Chris > >>>>> suggested. I have updated the KIP accordingly. I made couple of minor > >>>>> changes to the KIP: > >>>>> > >>>>> 1) One of them being the change of return type of partitions method > >>> from > >>>>> List to Set. This is to ensure that in case the implementation of > >>>>> StreamPartitoner is buggy and ends up returning duplicate > >>>>> partition numbers, we won't have duplicates thereby not trying to > >>> send to > >>>>> the same partition multiple times due to this. > >>>>> 2) I also added a check to send the record only to valid partition > >>>> numbers > >>>>> and log and drop when the partition number is invalid. This is again > >>> to > >>>>> prevent errors for cases when the StreamPartitioner implementation > has > >>>> some > >>>>> bugs (since there are no validations as such). > >>>>> 3) I also updated the Test Plan section based on the suggestion from > >>>> Bruno. > >>>>> 4) I updated the default implementation of partitions method based on > >>> the > >>>>> great catch from Chris! > >>>>> > >>>>> Let me know if it looks fine now. > >>>>> > >>>>> Thanks! > >>>>> Sagar. > >>>>> > >>>>> > >>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cado...@apache.org> > >>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> I am favour of discarding the sugar for broadcasting and leave the > >>>>>> broadcasting to the implementation as Chris suggests. I think that > is > >>>>>> the cleanest option. > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> On 29.08.22 19:50, Chris Egerton wrote: > >>>>>>> Hi all, > >>>>>>> > >>>>>>> I think it'd be useful to be more explicit about broadcasting to > all > >>>>>> topic > >>>>>>> partitions rather than add implicit behavior for empty cases (empty > >>>>>>> optional, empty list, etc.). The suggested enum approach would > >>> address > >>>>>> that > >>>>>>> nicely. > >>>>>>> > >>>>>>> It's also worth noting that there's no hard requirement to add > sugar > >>>> for > >>>>>>> broadcasting to all topic partitions since the API already provides > >>> the > >>>>>>> number of topic partitions available when calling a stream > >>> partitioner. > >>>>>> If > >>>>>>> we can't find a clean way to add this support, it might be better > to > >>>>>> leave > >>>>>>> it out and just let people implement this themselves with a line of > >>>> Java > >>>>>> 8 > >>>>>>> streams: > >>>>>>> > >>>>>>> return IntStream.range(0, > >>>>>>> numPartitions).boxed().collect(Collectors.toList()); > >>>>>>> > >>>>>>> Also worth noting that there may be a bug in the default > >>> implementation > >>>>>> for > >>>>>>> the new StreamPartitioner::partitions method, since it doesn't > >>> appear > >>>> to > >>>>>>> correctly pick up on null return values from the partition method > >>> and > >>>>>>> translate them into empty lists. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> > >>>>>>> Chris > >>>>>>> > >>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <cado...@apache.org> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Sagar, > >>>>>>>> > >>>>>>>> I do not see an issue with using an empty list together with an > >>> empty > >>>>>>>> Optional. A non-empty Optional that contains an empty list would > >>> just > >>>>>>>> say that there is no partition to which the record should be sent. > >>> If > >>>>>>>> there is no partition, the record is dropped. > >>>>>>>> > >>>>>>>> An empty Optional might be a way to say, broadcast to all > >>> partitions. > >>>>>>>> > >>>>>>>> Alternatively -- to make it more explicit -- you could return an > >>>> object > >>>>>>>> with an enum and a list of partitions. The enum could have values > >>>> SOME, > >>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions > >>> contains > >>>> the > >>>>>>>> partitions to which to send the record. If the value of the enum > is > >>>> ALL > >>>>>>>> or NONE, the list of partitions is not used and might be even null > >>>>>>>> (since in that case the list should not be used and it would be a > >>> bug > >>>> to > >>>>>>>> use it). > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Bruno > >>>>>>>> > >>>>>>>> On 24.08.22 20:11, Sagar wrote: > >>>>>>>>> Thank you Bruno/Matthew for your comments. > >>>>>>>>> > >>>>>>>>> I agree using null does seem error prone. However I think using a > >>>>>>>> singleton > >>>>>>>>> list of [-1] might be better in terms of usability, I am saying > >>> this > >>>>>>>>> because the KIP also has a provision to return an empty list to > >>> refer > >>>>>> to > >>>>>>>>> dropping the record. So, an empty optional and an empty list have > >>>>>> totally > >>>>>>>>> different meanings which could get confusing. > >>>>>>>>> > >>>>>>>>> Let me know what you think. > >>>>>>>>> > >>>>>>>>> Thanks! > >>>>>>>>> Sagar. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich > >>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote: > >>>>>>>>> > >>>>>>>>>> I also concur with this, having an Optional in the type makes it > >>>> very > >>>>>>>>>> clear what’s going on and better signifies an absence of value > >>> (or > >>>> in > >>>>>>>> this > >>>>>>>>>> case the broadcast value). > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Matthew de Detrich > >>>>>>>>>> Aiven Deutschland GmbH > >>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin > >>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B > >>>>>>>>>> > >>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > >>>>>>>>>> m: +491603708037 > >>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io > >>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote: > >>>>>>>>>>> > >>>>>>>>>>> 2. > >>>>>>>>>>> I would prefer changing the return type of partitions() to > >>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the > >>> broadcast > >>>>>>>>>>> value. IMO, The chances that an implementation returns null due > >>> to > >>>> a > >>>>>>>> bug > >>>>>>>>>>> is much higher than that an implementation returns an empty > >>>> Optional > >>>>>>>> due > >>>>>>>>>>> to a bug. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >