vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207577607
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ########## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - + Set<String> deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + if (deletedTopics.contains(tp.topic())) { Review Comment: This block of code uses already existing logic within `TopicAdmin` to figure out if a topic exists or not. https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L503-L511. I am just caching it because if we established that a topic is deleted for partition p1, then we don't need to check again for other partitions of the same topic within this flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org