[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711589#comment-17711589 ]
Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:46 PM: ------------------------------------------------------------------ {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. was (Author: morozov): {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. > Sink connector fails if a topic matching its topics.regex gets deleted > ---------------------------------------------------------------------- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 3.3.1 > Reporter: Sergei Morozov > Assignee: Sagar Rao > Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)