C0urante commented on code in PR #13504:
URL: https://github.com/apache/kafka/pull/13504#discussion_r1206885931


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -695,9 +705,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {
+                    log.debug("Not assigning offsets for topic-partition {} 
since the topic {} has been deleted", tp, tp.topic());
+                    continue;
+                }
+                long pos;
+                try {
+                    pos = consumer.position(tp);
+                } catch (TimeoutException e) {
+                    log.error("TimeoutException occurred when fetching 
position for topic partition {}. " +
+                            "Checking if the topic {} exists", tp, tp.topic());
+                    Map<String, TopicDescription> topic = 
topicAdmin.describeTopics(tp.topic());

Review Comment:
   This adds new ACL requirements for sink connectors' admin clients, which is 
a breaking change and cannot be done until the next major release.
   
   We also need to handle generic exceptions thrown during this call. Probably 
safest to assume that the topic exists if we fail to describe it.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {

Review Comment:
   We should not be relying on undocumented behavior like this without 
confirmation from someone familiar with the clients library that it's 
intentional, or at the very least, that it won't change in the future without a 
KIP.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1367,9 +1367,14 @@ public WorkerTask doBuild(Task task,
                     connectorClientConfigOverridePolicy, kafkaClusterId, 
ConnectorType.SINK);
             KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
 
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-worker-adminclient-" + id.connector(),

Review Comment:
   We already construct an admin client for sink connectors [if they use a DLQ 
topic](https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L947-L951).
   
   We should create at most one admin client per sink task, which means 
probably refactoring the `sinkTaskReporters` method to accept an admin client 
if we're going to be creating one unconditionally here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -700,9 +714,28 @@ private class HandleRebalance implements 
ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-
+            Set<String> deletedTopics = new HashSet<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                if (deletedTopics.contains(tp.topic())) {

Review Comment:
   Also, this entire block relies on racy logic to detect deleted topics that 
may fail if a topic is deleted after this check takes place. Again, we should 
reach out to someone familiar with the clients library. It's unlikely that 
Kafka Connect is the only consumer application that is impacted by this 
scenario and it's better to fix this at the clients level instead of 
implementing Connect-specific workarounds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to