junrao commented on code in PR #13797:
URL: https://github.com/apache/kafka/pull/13797#discussion_r1265788947


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -522,7 +525,33 @@ public void subscribe(Collection<String> topics, 
ConsumerRebalanceListener callb
 
     @Override
     public void assign(Collection<TopicPartition> partitions) {
-        throw new KafkaException("method not implemented");
+        if (partitions == null) {
+            throw new IllegalArgumentException("Topic partitions collection to 
assign to cannot be null");
+        }
+
+        if (partitions.isEmpty()) {
+            // TODO: implementation of unsubscribe() will be included in 
forthcoming commits.
+            // this.unsubscribe();
+            return;
+        }
+
+        for (TopicPartition tp : partitions) {
+            String topic = (tp != null) ? tp.topic() : null;
+            if (Utils.isBlank(topic))
+                throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
+        }
+
+        // TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
+        // fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+
+        // assignment change event will trigger autocommit if is it configured 
and the group id is specified. This is

Review Comment:
   is it => it is



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -106,4 +114,17 @@ private boolean process(final OffsetFetchApplicationEvent 
event) {
         manager.addOffsetFetchRequest(event.partitions);
         return true;
     }
+
+    private boolean process(final NewTopicsMetadataUpdateRequestEvent event) {
+        metadata.requestUpdateForNewTopics();
+        return true;
+    }
+
+    private boolean process(final AssignmentChangeApplicationEvent event) {
+        Optional<RequestManager> commitRequestManger = 
registry.get(RequestManager.Type.COMMIT);
+        CommitRequestManager manager = (CommitRequestManager) 
commitRequestManger.get();

Review Comment:
   If `group.id` is not specified, will `commitRequestManger` be defined? If 
not, we can't blindly call `get`. If yes, then we need the logic to skip offset 
commit when `group.id` is not specified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to