vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169767646


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+                    continue;
+                }
+                long pos;
+                try {
+                    pos = consumer.position(tp);
+                } catch (TimeoutException e) {

Review Comment:
   It works under normal scenarios. I tested by deleting 10 topics in one go 
and still didn't notice this error happening. In this case, when a large number 
of topics gets deleted is when we see this error. Note that I don't know what 
is the defintion of large but going by the update of the reporter of the 
ticket, they experience it with a few dozens of topics deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to