[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207962325 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -1367,9 +1367,14 @@ public WorkerTask doBuild(Task task, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); +Map adminOverrides = adminConfigs(id.connector(), "connector-worker-adminclient-" + id.connector(), Review Comment: Ok. Would do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207962014 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -695,9 +705,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not assigning offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); +continue; +} +long pos; +try { +pos = consumer.position(tp); +} catch (TimeoutException e) { +log.error("TimeoutException occurred when fetching position for topic partition {}. " + +"Checking if the topic {} exists", tp, tp.topic()); +Map topic = topicAdmin.describeTopics(tp.topic()); Review Comment: > This adds new ACL requirements for sink connectors' admin clients, That's a good point and something I missed. > Probably safest to assume that the topic exists if we fail to describe it. Yeah.. In the case when let's say `TopicAuthorizationException` is thrown, it again becomes difficult to say if the topic exists or not. The `TopicAdmin#describeTopics` throws a `ConnectException` in this case which could be a false positive. And since this PR throws any exceptions from `TopicAdmin#describeTopics` the connector might now fail with ConnectException, albeit a different one. So, as you said might be safe to assume that topic exists then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207922909 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: > It's unlikely that Kafka Connect is the only consumer application that is impacted by this scenario I understand that Connect is not the only consumer application that would be impacted by this behaviour. But ATM, it does impact the users in certain scenarios as described in the ticket. Nonetheless, I would check with engineers from the Client team. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207922909 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: > It's unlikely that Kafka Connect is the only consumer application that is impacted by this scenario I understand that Connect is not the only consumer application that would be impacted by this behaviour. But ATM, it does impact the users in certain scenarios as described in the ticket. Nonetheless, I would involve somebody from the Client team. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1207577607 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: This block of code uses already existing logic within `TopicAdmin` to figure out if a topic exists or not. https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L503-L511. I am just caching it because if we established that a topic is deleted for partition p1, then we don't need to check again for other partitions of the same topic within this flow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1192983841 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); +continue; +} +long pos; +try { +pos = consumer.position(tp); +} catch (TimeoutException e) { +log.error("TimeoutException occurred when fetching position for topic partition {}. " + +"Checking if the topic {} exists", tp, tp.topic()); +Map topic = topicAdmin.describeTopics(tp.topic()); +if (topic.isEmpty()) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); Review Comment: changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1192983825 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); Review Comment: Changed this line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1192983808 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -145,6 +154,10 @@ public WorkerSinkTask(ConnectorTaskId id, this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.taskStopped = false; this.workerErrantRecordReporter = workerErrantRecordReporter; +Map adminProps = new HashMap<>(); +adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, workerConfig.bootstrapServers()); Review Comment: I added the logic to create Admin client in Worker and pass it along to WorkerSinkTask. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169769282 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -145,6 +154,10 @@ public WorkerSinkTask(ConnectorTaskId id, this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); this.taskStopped = false; this.workerErrantRecordReporter = workerErrantRecordReporter; +Map adminProps = new HashMap<>(); +adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, workerConfig.bootstrapServers()); Review Comment: Ok. Thanks for pointing this out. Will make the changes as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169768346 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); +continue; +} +long pos; +try { +pos = consumer.position(tp); +} catch (TimeoutException e) { +log.error("TimeoutException occurred when fetching position for topic partition {}. " + +"Checking if the topic {} exists", tp, tp.topic()); +Map topic = topicAdmin.describeTopics(tp.topic()); +if (topic.isEmpty()) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); Review Comment: Yeah would change it. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169768087 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); +continue; +} +long pos; +try { +pos = consumer.position(tp); +} catch (TimeoutException e) { Review Comment: I had done some analysis and a plausible explanation this comment onwards: https://issues.apache.org/jira/browse/KAFKA-14750?focusedCommentId=17710077&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17710077 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169767646 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); +continue; +} +long pos; +try { +pos = consumer.position(tp); +} catch (TimeoutException e) { Review Comment: It works under normal scenarios. I tested by deleting 10 topics in one go and still didn't notice this error happening. In this case, when a large number of topics gets deleted is when we see this error. Note that I don't know what is the defintion of large but going by the update of the reporter of the ticket, they experience it with a few dozens of topics deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169764236 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { +log.debug("Not Committing offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); Review Comment: Ok. I saw `lastCommittedOffsets` map being updated and assumed this is about committing offsets. Will change it 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.
vamossagar12 commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1169762441 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - +Set deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { -long pos = consumer.position(tp); +if (deletedTopics.contains(tp.topic())) { Review Comment: Yeah that's the behaviour that the consumer client exhibits. I thin it's due to the fact that a topic deletion forces a consumer rebalance and rebalance cycle typically goes through `onPartitionsRevoked` -> `onPArtitionasAssigned` cycle. I am not sure if this is documented anywhere explictily for topic deletion. I wouldn't qualify it as a bug though. I think it has been there for a long time. I think Chris had also tried to rope in client team for another suggested change but we haven't heard back yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org