vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169767646
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ########## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - + Set<String> deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + if (deletedTopics.contains(tp.topic())) { + log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); + continue; + } + long pos; + try { + pos = consumer.position(tp); + } catch (TimeoutException e) { Review Comment: It works under normal scenarios. I tested by deleting 10 topics in one go and still didn't notice this error happening. In this case, when a large number of topics gets deleted is when we see this error. Note that I don't know what is the defintion of large but going by the update of the reporter of the ticket, they experience it with a few dozens of topics deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org