C0urante commented on code in PR #13504: URL: https://github.com/apache/kafka/pull/13504#discussion_r1206885931
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ########## @@ -695,9 +705,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - + Set<String> deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + if (deletedTopics.contains(tp.topic())) { + log.debug("Not assigning offsets for topic-partition {} since the topic {} has been deleted", tp, tp.topic()); + continue; + } + long pos; + try { + pos = consumer.position(tp); + } catch (TimeoutException e) { + log.error("TimeoutException occurred when fetching position for topic partition {}. " + + "Checking if the topic {} exists", tp, tp.topic()); + Map<String, TopicDescription> topic = topicAdmin.describeTopics(tp.topic()); Review Comment: This adds new ACL requirements for sink connectors' admin clients, which is a breaking change and cannot be done until the next major release. We also need to handle generic exceptions thrown during this call. Probably safest to assume that the topic exists if we fail to describe it. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ########## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - + Set<String> deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + if (deletedTopics.contains(tp.topic())) { Review Comment: We should not be relying on undocumented behavior like this without confirmation from someone familiar with the clients library that it's intentional, or at the very least, that it won't change in the future without a KIP. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1367,9 +1367,14 @@ public WorkerTask doBuild(Task task, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SINK); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-worker-adminclient-" + id.connector(), Review Comment: We already construct an admin client for sink connectors [if they use a DLQ topic](https://github.com/apache/kafka/blob/6d72c26731fe69955127a90e3d43f6d9eb41e2d3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L947-L951). We should create at most one admin client per sink task, which means probably refactoring the `sinkTaskReporters` method to accept an admin client if we're going to be creating one unconditionally here. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ########## @@ -700,9 +714,28 @@ private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); - + Set<String> deletedTopics = new HashSet<>(); for (TopicPartition tp : partitions) { - long pos = consumer.position(tp); + if (deletedTopics.contains(tp.topic())) { Review Comment: Also, this entire block relies on racy logic to detect deleted topics that may fail if a topic is deleted after this check takes place. Again, we should reach out to someone familiar with the clients library. It's unlikely that Kafka Connect is the only consumer application that is impacted by this scenario and it's better to fix this at the clients level instead of implementing Connect-specific workarounds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org