Hi Rajiv, I think it makes sense to return a read-only assignments. What we can improve here is we can have addPartition&removePartition method for consumer. Then we don't have to do any operations on the assignments returned by assignment method
BTW, I think you can implement PartitionAssignor interface to solve your use case. I couldn't find the javadoc for that interface but here is method you can use /** * Perform the group assignment given the member subscriptions and current cluster metadata. * @param metadata Current topic/broker metadata known by consumer * @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)} * @return A map from the members to their respective assignment. This should have one entry * for all members who in the input subscription map. */ Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); The subscription map has each consumer's member id as key. It can be used as a reference to the consumer and you can adjust the assignments there. On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > Hi Jason, > > The copying is not a problem in terms of performance. It's just annoying to > write the extra code. My point with the copy is that since the client is > already making a copy when it returns the set to me, why would it matter if > I modify the copy. Creating an unmodifiable set on top of a copy seems > redundant. It would be easiest for us as users to do something like this: > > final Set<TopicPartition> partitions = consumer.assignment(); // This > already returns a copy of the underlying assignment, thus ensuring that the > internal data structures are protected. > partitions.add(myNewTopicPartition); // This is fine to modify since > consumer.assignment() returns a copy. > partitions.remove(topicPartitionToBeRemoved); > consumer.assign(partitions); > > Instead we have to do something like this right now. > > final Set<TopicPartition> partitions = consumer.assignment(); // This > returns a copy of the underlying assignment wrapped in an UnmodifiableSet > which seems redundant. > final Set<TopicPartition> yetAnotherCopy = new HashSet<>(partitions); // > We need this copy since consumer.assignment() is unmodifiable, even though > it is a copy. > yetAnotherCopy.add(myNewTopicPartition); > yetAnotherCopy.remove(topicPartitionToBeRemoved); > List<TopicPartition> wayTooManyCopies = new ArrayList<>(yetAnotherCopy); > consumer.assign(wayTooManyCopies); > > Thanks, > Rajiv > > > On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hey Rajiv, > > > > I agree the Set/List inconsistency is a little unfortunate (another > > annoying one is pause() which uses a vararg). I think we should probably > > add the following variants: > > > > assign(Collection<TopicPartition>) > > subscribe(Collection<String>) > > pause(Collection<TopicPartition>) > > > > I can open a JIRA to fix this. As for returning the unmodifiable set, I > can > > see your point, but I think it's a little dangerous for user code to > depend > > on being able to modify a collection returned from the API. Making it > > immutable reduces the coupling with user code and gives us more freedom > in > > the future (not that we have any intention of changing the set type, but > we > > could). I think the way I might try to implement your use case would be > to > > maintain the assignment set yourself. You can make changes to that set > and > > always pass it to assign(), which would avoid the need to use > assignment(). > > Also, I probably wouldn't be overly concerned about the copying overhead > > unless profiling shows that it is actually a problem. Are your partition > > assignments generally very large? > > > > -Jason > > > > > > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian <ra...@signalfx.com> > wrote: > > > > > We are trying to use the Kafka 0.9 consumer API to poll specific > > > partitions. We consume partitions based on our own logic instead of > > > delegating that to Kafka. One of our use cases is handling a change in > > the > > > partitions that we consume. This means that sometimes we need to > consume > > > additional partitions and other times we need to stop consuming (not > > pause > > > but stop entirely) some of the partitions that we are currently > polling. > > > > > > The semantics of the assign() call at > > > > > > > > > http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html > > > is that we need to provide the entire list of subscriptions. So when we > > > want to add or remove partitions we call the assignment() method to get > > the > > > existing set of TopicPartitions being polled, and then modify this set > > and > > > pass it back to the assign() call. However it seems weird that the > > assign() > > > call takes a List<TopicPartitions> whereas the assignment call returns > a > > > Set<TopicPartitions>. Further the Set returned by the method is an > > > unmodifiable set which means to change this set we need to create a new > > > List/Set from it and then modify the new collection. Looking at the > code > > > for the assignment() method further shows that a copy of the underlying > > set > > > is made and then wrapped in an unmodifiable set. The wrapping seems > > > unnecessary given that a copy is already being made. Excerpt here: > > > > > > public Set<TopicPartition> assignment() { > > > > > > acquire(); > > > > > > try { > > > > > > return Collections.unmodifiableSet(new HashSet<>(this. > > > subscriptions.assignedPartitions())); > > > > > > } finally { > > > > > > release(); > > > > > > } > > > > > > } > > > > > > Ideally the API would take and return a Set instead of taking in a List > > and > > > returning a Set. Further given that the Set returned is a copy of the > > > existing assignments, wrapping it in an unmodifiable set seems overkill > > > which requires the user of the API to make yet another copy just to > > modify > > > what is already a copy. > > > > > > Thanks, > > > Rajiv > > > > > >