Randall Hauch created KAFKA-8370:
------------------------------------

             Summary: Kafka Connect should check for existence of internal 
topics before attempting to create them
                 Key: KAFKA-8370
                 URL: https://issues.apache.org/jira/browse/KAFKA-8370
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.11.0.0
            Reporter: Randall Hauch
            Assignee: Randall Hauch


The Connect worker doesn't current check for the existence of the internal 
topics, and instead is issuing a CreateTopic request and handling a 
TopicExistsException. However, this can cause problems when the number of 
brokers is fewer than the replication factor, *even if the topic already 
exists* and the partitions of those topics all remain available on the 
remaining brokers.

One problem of the current approach is that the broker checks the requested 
replication factor before checking for the existence of the topic, resulting in 
unexpected exceptions when the topic does exist:

{noformat}
connect      | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder 
work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect      | org.apache.kafka.connect.errors.ConnectException: Error while 
attempting to create/find topic(s) 'connect-offsets'
connect      |  at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
connect      |  at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
connect      |  at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
connect      |  at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
connect      |  at 
org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
connect      |  at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
connect      |  at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
connect      |  at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect      |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect      |  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect      |  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect      |  at java.lang.Thread.run(Thread.java:748)
connect      | Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 2.
connect      |  at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
connect      |  at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
connect      |  at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
connect      |  at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
connect      |  at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
connect      |  ... 11 more
connect      | Caused by: 
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication 
factor: 3 larger than available brokers: 2.
connect      | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping 
(org.apache.kafka.connect.runtime.Connect)
{noformat}

Instead of always issuing a CreateTopic request, the worker's admin client 
should first check whether the topic exists, and if not *then* attempt to 
create the topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to