[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-27 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1207962325


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -1367,9 +1367,14 @@ public WorkerTask doBuild(Task task,
 connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK);
 KafkaConsumer consumer = new 
KafkaConsumer<>(consumerProps);
 
+Map adminOverrides = adminConfigs(id.connector(), 
"connector-worker-adminclient-" + id.connector(),

Review Comment:
   Ok. Would do that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-27 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1207962014


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -695,9 +705,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not assigning offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+continue;
+}
+long pos;
+try {
+pos = consumer.position(tp);
+} catch (TimeoutException e) {
+log.error("TimeoutException occurred when fetching 
position for topic partition {}. " +
+"Checking if the topic {} exists", tp, tp.topic());
+Map topic = 
topicAdmin.describeTopics(tp.topic());

Review Comment:
   > This adds new ACL requirements for sink connectors' admin clients,
   
   That's a good point and something I missed. 
   
   > Probably safest to assume that the topic exists if we fail to describe it.
   
   Yeah.. In the case when let's say `TopicAuthorizationException` is thrown, 
it again becomes difficult to say if the topic exists or not. The 
`TopicAdmin#describeTopics` throws a `ConnectException` in this case which 
could be a false positive. And since this PR throws any exceptions from 
`TopicAdmin#describeTopics` the connector might now fail with ConnectException, 
albeit a different one. So, as you said might be safe to assume that topic 
exists then.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-27 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1207922909


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   > It's unlikely that Kafka Connect is the only consumer application that is 
impacted by this scenario
   
   I understand that Connect is not the only consumer application that would be 
impacted by this behaviour. But ATM, it does impact the users in certain 
scenarios as described in the ticket. Nonetheless, I would check with engineers 
from the Client team. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-27 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1207922909


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   > It's unlikely that Kafka Connect is the only consumer application that is 
impacted by this scenario
   
   I understand that Connect is not the only consumer application that would be 
impacted by this behaviour. But ATM, it does impact the users in certain 
scenarios as described in the ticket. Nonetheless, I would involve somebody 
from the Client team. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-26 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1207577607


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   This block of code uses already existing logic within `TopicAdmin`  to 
figure out if a topic exists or not. 
https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L503-L511.
 I am just caching it because if we established that a topic is deleted for 
partition p1, then we don't need to check again for other partitions of the 
same topic within this flow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-13 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1192983841


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+continue;
+}
+long pos;
+try {
+pos = consumer.position(tp);
+} catch (TimeoutException e) {
+log.error("TimeoutException occurred when fetching 
position for topic partition {}. " +
+"Checking if the topic {} exists", tp, tp.topic());
+Map topic = 
topicAdmin.describeTopics(tp.topic());
+if (topic.isEmpty()) {
+log.debug("Not Committing offsets for topic-partition 
{} since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-13 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1192983825


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   Changed this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-05-13 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1192983808


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -145,6 +154,10 @@ public WorkerSinkTask(ConnectorTaskId id,
 this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
 this.taskStopped = false;
 this.workerErrantRecordReporter = workerErrantRecordReporter;
+Map adminProps = new HashMap<>();
+adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
workerConfig.bootstrapServers());

Review Comment:
   I added the logic to create Admin client in Worker and pass it along to 
WorkerSinkTask.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-04-18 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169769282


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -145,6 +154,10 @@ public WorkerSinkTask(ConnectorTaskId id,
 this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
 this.taskStopped = false;
 this.workerErrantRecordReporter = workerErrantRecordReporter;
+Map adminProps = new HashMap<>();
+adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
workerConfig.bootstrapServers());

Review Comment:
   Ok. Thanks for pointing this out. Will make the changes as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-04-18 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169768346


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+continue;
+}
+long pos;
+try {
+pos = consumer.position(tp);
+} catch (TimeoutException e) {
+log.error("TimeoutException occurred when fetching 
position for topic partition {}. " +
+"Checking if the topic {} exists", tp, tp.topic());
+Map topic = 
topicAdmin.describeTopics(tp.topic());
+if (topic.isEmpty()) {
+log.debug("Not Committing offsets for topic-partition 
{} since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   Yeah would change it. Thanks
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-04-18 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169768087


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+continue;
+}
+long pos;
+try {
+pos = consumer.position(tp);
+} catch (TimeoutException e) {

Review Comment:
   I had done some analysis and a plausible explanation this comment onwards: 
https://issues.apache.org/jira/browse/KAFKA-14750?focusedCommentId=17710077=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17710077



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-04-18 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169767646


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+continue;
+}
+long pos;
+try {
+pos = consumer.position(tp);
+} catch (TimeoutException e) {

Review Comment:
   It works under normal scenarios. I tested by deleting 10 topics in one go 
and still didn't notice this error happening. In this case, when a large number 
of topics gets deleted is when we see this error. Note that I don't know what 
is the defintion of large but going by the update of the reporter of the 
ticket, they experience it with a few dozens of topics deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-04-18 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169764236


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {
+log.debug("Not Committing offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());

Review Comment:
   Ok. I saw `lastCommittedOffsets` map being updated and assumed this is about 
committing offsets. Will change it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13504: KAFKA-14750: Check if topic exists in WorkerSinkTask when committing offsets.

2023-04-18 Thread via GitHub


vamossagar12 commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1169762441


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
 log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+Set deletedTopics = new HashSet<>();
 for (TopicPartition tp : partitions) {
-long pos = consumer.position(tp);
+if (deletedTopics.contains(tp.topic())) {

Review Comment:
   Yeah that's the behaviour that the consumer client exhibits. I thin it's due 
to the fact that a topic deletion forces a consumer rebalance and rebalance 
cycle typically goes through `onPartitionsRevoked` -> `onPArtitionasAssigned` 
cycle. I am not sure if this is documented anywhere explictily for topic 
deletion. 
   I wouldn't qualify it as a bug though. I think it has been there for a long 
time. I think Chris had also tried to rope in client team for another suggested 
change but we haven't heard back yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org