[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710994#comment-17710994
 ] 

Sagar Rao commented on KAFKA-14750:
-----------------------------------

hi [~morozov] , even I am not an expert on the Kafka Clients internals, but 
looking at the code what I think seems to be happening is that in the case when 
multiple topic partitions are being deleted, the offset fetcher thread seems to 
be getting stuck and eventually the timeout of 60s expires. It could be due to 
the fact that the way this test has been setup that none of the topic 
partitions have any offsets committed for them so the fetcher tries to retrieve 
them. When there are multiple such requests, the OffsetFetch is not able to 
finish in time because. This is a very naive description of the problem though.

Regarding the solution to this problem, how big of an impact is this to you for 
your usecase actually? Is the test you listed in the OP a reflection of how the 
connect is used by you? Maybe that could give some clues to a tradeoff that we 
might need to make if we want to get around this.

 

We can do as you suggested that we can catch the `TimeoutException` and try to 
fetch from the admin client if the topic exists. If it doesn't exist, then we 
can skip the offset commit but what happens if it exists? Do we retry? Is it 
even a possible scenario i.e can we assume that if position API throws 
`TimeoutException` then it definitely means a TopicPartition doesn't exist ( 
which I am not sure about).

> Sink connector fails if a topic matching its topics.regex gets deleted
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-14750
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14750
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.3.1
>            Reporter: Sergei Morozov
>            Assignee: Sagar Rao
>            Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms 
> expired before the position for partition connect-test-211-0 could be 
> determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to