[ 
https://issues.apache.org/jira/browse/KAFKA-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16952285#comment-16952285
 ] 

Randall Hauch commented on KAFKA-8370:
--------------------------------------

I agree that adding a check for the existence of the topic will not guarantee 
that we won't run into this situation, but that if we were to add that check it 
would likely solve the problems most of the time. I also agree that that 
KAFKA-7633 has since fixed the issue w/r/t topic authorization issues, but 
perhaps given the above `InvalidReplicatorFactorException` perhaps we should 
make two changes:

# Check whether the topic exists before attempting to use the AdminClient to 
create the topic; and
# Also handle the `InvalidReplicationFactor` exception with a more useful error 
message.

I think this is warranted because when the topic *does* already exist, the 
check will allow the `TopicAdmin` logic to simply avoid the complexities of the 
AdminClient's behavior and how the `TopicAdmin` handles the response. It also 
is more in line with the KIP's specified intention of always using the topic 
as-is if one already exists, even when that replication factor from the worker 
config doesn't match the topic's replication factor. 



> Kafka Connect should check for existence of internal topics before attempting 
> to create them
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8370
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8370
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Major
>
> The Connect worker doesn't current check for the existence of the internal 
> topics, and instead is issuing a CreateTopic request and handling a 
> TopicExistsException. However, this can cause problems when the number of 
> brokers is fewer than the replication factor, *even if the topic already 
> exists* and the partitions of those topics all remain available on the 
> remaining brokers.
> One problem of the current approach is that the broker checks the requested 
> replication factor before checking for the existence of the topic, resulting 
> in unexpected exceptions when the topic does exist:
> {noformat}
> connect      | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder 
> work thread, exiting:  
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> connect      | org.apache.kafka.connect.errors.ConnectException: Error while 
> attempting to create/find topic(s) 'connect-offsets'
> connect      |        at 
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
> connect      |        at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
> connect      |        at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
> connect      |        at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
> connect      |        at 
> org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
> connect      |        at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
> connect      |        at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
> connect      |        at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> connect      |        at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> connect      |        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> connect      |        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> connect      |        at java.lang.Thread.run(Thread.java:748)
> connect      | Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
> factor: 3 larger than available brokers: 2.
> connect      |        at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> connect      |        at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> connect      |        at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> connect      |        at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> connect      |        at 
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
> connect      |        ... 11 more
> connect      | Caused by: 
> org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
> factor: 3 larger than available brokers: 2.
> connect      | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping 
> (org.apache.kafka.connect.runtime.Connect)
> {noformat}
> Instead of always issuing a CreateTopic request, the worker's admin client 
> should first check whether the topic exists, and if not *then* attempt to 
> create the topic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to