junrao commented on code in PR #13797: URL: https://github.com/apache/kafka/pull/13797#discussion_r1265788947
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -522,7 +525,33 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb @Override public void assign(Collection<TopicPartition> partitions) { - throw new KafkaException("method not implemented"); + if (partitions == null) { + throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null"); + } + + if (partitions.isEmpty()) { + // TODO: implementation of unsubscribe() will be included in forthcoming commits. + // this.unsubscribe(); + return; + } + + for (TopicPartition tp : partitions) { + String topic = (tp != null) ? tp.topic() : null; + if (Utils.isBlank(topic)) + throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); + } + + // TODO: implementation of refactored Fetcher will be included in forthcoming commits. + // fetcher.clearBufferedDataForUnassignedPartitions(partitions); + + // assignment change event will trigger autocommit if is it configured and the group id is specified. This is Review Comment: is it => it is ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent event) { manager.addOffsetFetchRequest(event.partitions); return true; } + + private boolean process(final NewTopicsMetadataUpdateRequestEvent event) { + metadata.requestUpdateForNewTopics(); + return true; + } + + private boolean process(final AssignmentChangeApplicationEvent event) { + Optional<RequestManager> commitRequestManger = registry.get(RequestManager.Type.COMMIT); + CommitRequestManager manager = (CommitRequestManager) commitRequestManger.get(); Review Comment: If `group.id` is not specified, will `commitRequestManger` be defined? If not, we can't blindly call `get`. If yes, then we need the logic to skip offset commit when `group.id` is not specified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org