Hi Rajiv,

I think it makes sense to return a read-only assignments. What we can
improve here is we can have addPartition&removePartition method for
consumer.
Then we don't have to do any operations on the assignments returned by
assignment method

BTW,
I think you can implement PartitionAssignor interface to solve your use
case.
I couldn't find the javadoc for that interface but here is method you can
use

    /**
     * Perform the group assignment given the member subscriptions and
current cluster metadata.
     * @param metadata Current topic/broker metadata known by consumer
     * @param subscriptions Subscriptions from all members provided through
{@link #subscription(Set)}
     * @return A map from the members to their respective assignment. This
should have one entry
     *         for all members who in the input subscription map.
     */
    Map<String, Assignment> assign(Cluster metadata, Map<String,
Subscription> subscriptions);

The subscription map has each consumer's member id as key. It can be used
as a reference to the consumer and you can adjust the assignments there.




On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Hi Jason,
>
> The copying is not a problem in terms of performance. It's just annoying to
> write the extra code. My point with the copy is that since the client is
> already making a copy when it returns the set to me, why would it matter if
> I modify the copy. Creating an unmodifiable set on top of a copy seems
> redundant. It would be easiest for us as users to do something like this:
>
> final Set<TopicPartition> partitions = consumer.assignment();  // This
> already returns a copy of the underlying assignment, thus ensuring that the
> internal data structures are protected.
> partitions.add(myNewTopicPartition);  // This is fine to modify since
> consumer.assignment() returns a copy.
> partitions.remove(topicPartitionToBeRemoved);
> consumer.assign(partitions);
>
> Instead we have to do something like this right now.
>
> final Set<TopicPartition> partitions = consumer.assignment();  // This
> returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> which seems redundant.
> final Set<TopicPartition> yetAnotherCopy = new HashSet<>(partitions);  //
> We need this copy since consumer.assignment() is unmodifiable, even though
> it is a copy.
> yetAnotherCopy.add(myNewTopicPartition);
> yetAnotherCopy.remove(topicPartitionToBeRemoved);
> List<TopicPartition> wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> consumer.assign(wayTooManyCopies);
>
> Thanks,
> Rajiv
>
>
> On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Rajiv,
> >
> > I agree the Set/List inconsistency is a little unfortunate (another
> > annoying one is pause() which uses a vararg). I think we should probably
> > add the following variants:
> >
> > assign(Collection<TopicPartition>)
> > subscribe(Collection<String>)
> > pause(Collection<TopicPartition>)
> >
> > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> can
> > see your point, but I think it's a little dangerous for user code to
> depend
> > on being able to modify a collection returned from the API. Making it
> > immutable reduces the coupling with user code and gives us more freedom
> in
> > the future (not that we have any intention of changing the set type, but
> we
> > could). I think the way I might try to implement your use case would be
> to
> > maintain the assignment set yourself. You can make changes to that set
> and
> > always pass it to assign(), which would avoid the need to use
> assignment().
> > Also, I probably wouldn't be overly concerned about the copying overhead
> > unless profiling shows that it is actually a problem. Are your partition
> > assignments generally very large?
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >
> > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > partitions. We consume partitions based on our own logic instead of
> > > delegating that to Kafka. One of our use cases is handling a change in
> > the
> > > partitions that we consume. This means that sometimes we need to
> consume
> > > additional partitions and other times we need to stop consuming (not
> > pause
> > > but stop entirely) some of the partitions that we are currently
> polling.
> > >
> > > The semantics of the assign() call at
> > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > is that we need to provide the entire list of subscriptions. So when we
> > > want to add or remove partitions we call the assignment() method to get
> > the
> > > existing set of TopicPartitions being polled, and then modify this set
> > and
> > > pass it back to the assign() call. However it seems weird that the
> > assign()
> > > call takes a List<TopicPartitions> whereas the assignment call returns
> a
> > > Set<TopicPartitions>. Further the Set returned by the method is an
> > > unmodifiable set which means to change this set we need to create a new
> > > List/Set from it and then modify the new collection. Looking at the
> code
> > > for the assignment() method further shows that a copy of the underlying
> > set
> > > is made and then wrapped in an unmodifiable set. The wrapping seems
> > > unnecessary given that a copy is already being made. Excerpt here:
> > >
> > > public Set<TopicPartition> assignment() {
> > >
> > >         acquire();
> > >
> > >         try {
> > >
> > >             return Collections.unmodifiableSet(new HashSet<>(this.
> > > subscriptions.assignedPartitions()));
> > >
> > >         } finally {
> > >
> > >             release();
> > >
> > >         }
> > >
> > >     }
> > >
> > > Ideally the API would take and return a Set instead of taking in a List
> > and
> > > returning a Set. Further given that the Set returned is a copy of the
> > > existing assignments, wrapping it in an unmodifiable set seems overkill
> > > which requires the user of the API to make yet another copy just to
> > modify
> > > what is already a copy.
> > >
> > > Thanks,
> > > Rajiv
> > >
> >
>

Reply via email to