[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17693369#comment-17693369 ]
Chris Egerton edited comment on KAFKA-14750 at 2/27/23 3:28 PM: ---------------------------------------------------------------- Thanks for filing this, [~morozov]! I've done some local testing and confirmed that the issue affects the current trunk, and after doing some digging, I suspect it goes back pretty far. Initially I believed that this was simply a matter of adjusting the task's consumer's [metadata refresh interval|https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms] to be lower, which would cause it to detect changes in its topic regex subscription sooner. However, even after making that tweak, issues still surfaced. This is due to the fact that, after a topic is deleted, the task's [consumer rebalance listener|https://github.com/apache/kafka/blob/400ba0aeaeb6c460069d5ad12b1b3976ab447332/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L699-L784] is notified of the revocation of the partitions for that topic, which triggers an attempt to commit offsets–including offsets for the revoked topic partitions. There are a couple of approaches I can think of for this: # Adjust the Connect runtime's behavior to somehow discern the set of still-existing topic partitions before committing offsets, and skip committing offsets for recently-deleted topic partitions # Tweak the consumer logic to invoke {{ConsumerReabalanceListener::onPartitionsLost}} instead of {{ConsumerRebalanceListener::onPartitionsRevoked}} for deleted topic partitions. Given that option 1 is inherently subject to race conditions, I'd prefer to pursue option 2 initially. However, I'm not too familiar with the clients side of things, so it'd be nice to get a second opinion. [~jasong35] [~pnee] if either of you get a chance, would you mind weighing in here? TL; DR: Should we be treating deleted topic partitions as "lost" instead of "revoked" with consumer rebalance listeners? was (Author: chrisegerton): Thanks for filing this, [~morozov]! I've done some local testing and confirmed that the issue affects the current trunk, and after doing some digging, I suspect it goes back pretty far. Initially I suspected that this was simply a matter of adjusting the task's consumer's [metadata refresh interval|https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms] to be lower, which would cause it to detect changes in its topic regex subscription sooner. However, even after making that tweak, issues still surfaced. This is due to the fact that, after a topic is deleted, the task's [consumer rebalance listener|https://github.com/apache/kafka/blob/400ba0aeaeb6c460069d5ad12b1b3976ab447332/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L699-L784] is notified of the revocation of the partitions for that topic, which triggers an attempt to commit offsets–including offsets for the revoked topic partitions. There are a couple of approaches I can think of for this: # Adjust the Connect runtime's behavior to somehow discern the set of still-existing topic partitions before committing offsets, and skip committing offsets for recently-deleted topic partitions # Tweak the consumer logic to invoke {{ConsumerReabalanceListener::onPartitionsLost}} instead of {{ConsumerRebalanceListener::onPartitionsRevoked}} for deleted topic partitions. Given that option 1 is inherently subject to race conditions, I'd prefer to pursue option 2 initially. However, I'm not too familiar with the clients side of things, so it'd be nice to get a second opinion. [~jasong35] [~pnee] if either of you get a chance, would you mind weighing in here? TL; DR: Should we be treating deleted topic partitions as "lost" instead of "revoked" with consumer rebalance listeners? > Sink connector fails if a topic matching its topics.regex gets deleted > ---------------------------------------------------------------------- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.3.1 > Reporter: Sergei Morozov > Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)