Just FYI, I filed a ticket to improve the Consumer#committed API. You can
track this on https://issues.apache.org/jira/browse/KAFKA-12485

On Tue, Mar 16, 2021 at 2:16 AM Mazen Ezzeddine <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Great, thanks so much for the detailed answer.
>
> Best,
> ________________________________
> From: Sophie Blee-Goldman <sop...@confluent.io.INVALID>
> Sent: Tuesday, March 16, 2021 5:58 AM
> To: users@kafka.apache.org <users@kafka.apache.org>
> Subject: Re: Slightly Modified Sticky Assignor.
>
> Hey Mazen,
>
> The easiest way to approach this is probably to pass in a reference to the
> associated Consumer and
> then just call one of the *Consumer#committed *methods which return the
> OffsetAndMetadata.
>
> But I'm guessing your underling question may be about how to get that
> reference to the Consumer in
> the first place. There really isn't very good support for this in the
> ConsumerPartitionAssignor interface
> since it relies on reflection to instantiate the assignor with the default
> constructor. I would recommend
> making your custom ConsumerPartitionAssignor implement the Configurable
> interface, and then use
> the configs you pass in to the Consumer to ultimately get a handle on it.
> Since you have to provide the
> configs to construct the Consumer in the first place, you might need a
> layer of indirection: for example
>
> class ConsumerProvider {
>     private Consumer consumer;
>
>     public void setConsumer(Consumer consumer);
>
>     public Consumer getConsumer();
> }
>
> // main code
> ConsumerProvider provider = new ConsumerProvider;
> consumerConfig.put("MY_CONSUMER_PROVIDER", provider);
> Consumer consumer = new KafkaConsumer(consumerConfig, ...);
> provider.setConsumer(consumer);
>
> class MyAssignor implements ConsumerPArtitionAssignor, Configurable {
>
>     private ConsumerProvider;
>
>     @Override
>     public void configure(configs) {
>         this.consumerProvider = configs.get("MY_CONSUMER_PROVIDER");
>     }
>
>     @Override
>     ByteBuffer subscriptionUserData(topics) {
>         offsets = consumerProvider.getConsumer().committed(...);
>     }
> }
>
> Just a warning, I haven't actually tested this out, but the general idea of
> using configs should work.
>
> I know you said "seamless" and this is anything but :/ Maybe I'm tired and
> missing something obvious, but
> clearly there's room for improvement here. You can file a JIRA ticket to
> improve the partition assignor
> experience and make it easier to set up (and even work on this yourself if
> you're interested)
>
> Unfortunately you generally want to keep the subscriptionUserData() method
> pretty light, and this
> method will make a remote call to the server. To be honest, I'm not totally
> sure why that is the case, since
> the Consumer should know what offsets it's committed for which
> partitions...maybe someone else can
> jump in on the choice behind that. This has come up before, so it's worth
> investigating whether we can
> just return the cached offsets if they're available and only make a remote
> call for *Consumer#committed* if
> absolutely necessary. I'll try to follow up on that
>
> On Tue, Mar 9, 2021 at 9:26 AM Mazen Ezzeddine <
> mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:
>
> > Hi all,
> >
> > I am implementing a custom partition assignor slightly different than the
> > sticky assignor  assignor. As known, in the sticky assignor each consumer
> > sends the set of owned partitions as part of its subscription message.
> This
> > happens in the subscriptionUserData by calling the
> > serializeTopicPartitionAssignment method etc…
> >
> > Kindly, my question is that what is the most seamless way to get offset
> > information (e.g., last committed offset) for each owned partition from
> > within the subscriptionUserData method or generally from within the
> > stickyAssignor class, preferably using public APIs.
> >
> > Thank you.
> >
>

Reply via email to