Just FYI, I filed a ticket to improve the Consumer#committed API. You can track this on https://issues.apache.org/jira/browse/KAFKA-12485
On Tue, Mar 16, 2021 at 2:16 AM Mazen Ezzeddine < mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > Great, thanks so much for the detailed answer. > > Best, > ________________________________ > From: Sophie Blee-Goldman <sop...@confluent.io.INVALID> > Sent: Tuesday, March 16, 2021 5:58 AM > To: users@kafka.apache.org <users@kafka.apache.org> > Subject: Re: Slightly Modified Sticky Assignor. > > Hey Mazen, > > The easiest way to approach this is probably to pass in a reference to the > associated Consumer and > then just call one of the *Consumer#committed *methods which return the > OffsetAndMetadata. > > But I'm guessing your underling question may be about how to get that > reference to the Consumer in > the first place. There really isn't very good support for this in the > ConsumerPartitionAssignor interface > since it relies on reflection to instantiate the assignor with the default > constructor. I would recommend > making your custom ConsumerPartitionAssignor implement the Configurable > interface, and then use > the configs you pass in to the Consumer to ultimately get a handle on it. > Since you have to provide the > configs to construct the Consumer in the first place, you might need a > layer of indirection: for example > > class ConsumerProvider { > private Consumer consumer; > > public void setConsumer(Consumer consumer); > > public Consumer getConsumer(); > } > > // main code > ConsumerProvider provider = new ConsumerProvider; > consumerConfig.put("MY_CONSUMER_PROVIDER", provider); > Consumer consumer = new KafkaConsumer(consumerConfig, ...); > provider.setConsumer(consumer); > > class MyAssignor implements ConsumerPArtitionAssignor, Configurable { > > private ConsumerProvider; > > @Override > public void configure(configs) { > this.consumerProvider = configs.get("MY_CONSUMER_PROVIDER"); > } > > @Override > ByteBuffer subscriptionUserData(topics) { > offsets = consumerProvider.getConsumer().committed(...); > } > } > > Just a warning, I haven't actually tested this out, but the general idea of > using configs should work. > > I know you said "seamless" and this is anything but :/ Maybe I'm tired and > missing something obvious, but > clearly there's room for improvement here. You can file a JIRA ticket to > improve the partition assignor > experience and make it easier to set up (and even work on this yourself if > you're interested) > > Unfortunately you generally want to keep the subscriptionUserData() method > pretty light, and this > method will make a remote call to the server. To be honest, I'm not totally > sure why that is the case, since > the Consumer should know what offsets it's committed for which > partitions...maybe someone else can > jump in on the choice behind that. This has come up before, so it's worth > investigating whether we can > just return the cached offsets if they're available and only make a remote > call for *Consumer#committed* if > absolutely necessary. I'll try to follow up on that > > On Tue, Mar 9, 2021 at 9:26 AM Mazen Ezzeddine < > mazen.ezzedd...@etu.univ-cotedazur.fr> wrote: > > > Hi all, > > > > I am implementing a custom partition assignor slightly different than the > > sticky assignor assignor. As known, in the sticky assignor each consumer > > sends the set of owned partitions as part of its subscription message. > This > > happens in the subscriptionUserData by calling the > > serializeTopicPartitionAssignment method etc… > > > > Kindly, my question is that what is the most seamless way to get offset > > information (e.g., last committed offset) for each owned partition from > > within the subscriptionUserData method or generally from within the > > stickyAssignor class, preferably using public APIs. > > > > Thank you. > > >