Hey Rajiv,

I agree the Set/List inconsistency is a little unfortunate (another
annoying one is pause() which uses a vararg). I think we should probably
add the following variants:

assign(Collection<TopicPartition>)
subscribe(Collection<String>)
pause(Collection<TopicPartition>)

I can open a JIRA to fix this. As for returning the unmodifiable set, I can
see your point, but I think it's a little dangerous for user code to depend
on being able to modify a collection returned from the API. Making it
immutable reduces the coupling with user code and gives us more freedom in
the future (not that we have any intention of changing the set type, but we
could). I think the way I might try to implement your use case would be to
maintain the assignment set yourself. You can make changes to that set and
always pass it to assign(), which would avoid the need to use assignment().
Also, I probably wouldn't be overly concerned about the copying overhead
unless profiling shows that it is actually a problem. Are your partition
assignments generally very large?

-Jason


On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> We are trying to use the Kafka 0.9 consumer API to poll specific
> partitions. We consume partitions based on our own logic instead of
> delegating that to Kafka. One of our use cases is handling a change in the
> partitions that we consume. This means that sometimes we need to consume
> additional partitions and other times we need to stop consuming (not pause
> but stop entirely) some of the partitions that we are currently polling.
>
> The semantics of the assign() call at
>
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> is that we need to provide the entire list of subscriptions. So when we
> want to add or remove partitions we call the assignment() method to get the
> existing set of TopicPartitions being polled, and then modify this set and
> pass it back to the assign() call. However it seems weird that the assign()
> call takes a List<TopicPartitions> whereas the assignment call returns a
> Set<TopicPartitions>. Further the Set returned by the method is an
> unmodifiable set which means to change this set we need to create a new
> List/Set from it and then modify the new collection. Looking at the code
> for the assignment() method further shows that a copy of the underlying set
> is made and then wrapped in an unmodifiable set. The wrapping seems
> unnecessary given that a copy is already being made. Excerpt here:
>
> public Set<TopicPartition> assignment() {
>
>         acquire();
>
>         try {
>
>             return Collections.unmodifiableSet(new HashSet<>(this.
> subscriptions.assignedPartitions()));
>
>         } finally {
>
>             release();
>
>         }
>
>     }
>
> Ideally the API would take and return a Set instead of taking in a List and
> returning a Set. Further given that the Set returned is a copy of the
> existing assignments, wrapping it in an unmodifiable set seems overkill
> which requires the user of the API to make yet another copy just to modify
> what is already a copy.
>
> Thanks,
> Rajiv
>

Reply via email to