[ https://issues.apache.org/jira/browse/KAFKA-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16952395#comment-16952395 ]
Chris Egerton commented on KAFKA-8370: -------------------------------------- Looks like https://issues.apache.org/jira/browse/KAFKA-8125 was duplicated later on by https://issues.apache.org/jira/browse/KAFKA-8875, which has actually already been resolved. So at least on versions that have the fix for 8875, the InvalidReplicationFactorException shouldn't be an issue for Connect. > Kafka Connect should check for existence of internal topics before attempting > to create them > -------------------------------------------------------------------------------------------- > > Key: KAFKA-8370 > URL: https://issues.apache.org/jira/browse/KAFKA-8370 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Major > > The Connect worker doesn't current check for the existence of the internal > topics, and instead is issuing a CreateTopic request and handling a > TopicExistsException. However, this can cause problems when the number of > brokers is fewer than the replication factor, *even if the topic already > exists* and the partitions of those topics all remain available on the > remaining brokers. > One problem of the current approach is that the broker checks the requested > replication factor before checking for the existence of the topic, resulting > in unexpected exceptions when the topic does exist: > {noformat} > connect | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder > work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > connect | org.apache.kafka.connect.errors.ConnectException: Error while > attempting to create/find topic(s) 'connect-offsets' > connect | at > org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255) > connect | at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99) > connect | at > org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127) > connect | at > org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109) > connect | at > org.apache.kafka.connect.runtime.Worker.start(Worker.java:164) > connect | at > org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114) > connect | at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214) > connect | at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > connect | at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > connect | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > connect | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > connect | at java.lang.Thread.run(Thread.java:748) > connect | Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication > factor: 3 larger than available brokers: 2. > connect | at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > connect | at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > connect | at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > connect | at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > connect | at > org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228) > connect | ... 11 more > connect | Caused by: > org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication > factor: 3 larger than available brokers: 2. > connect | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping > (org.apache.kafka.connect.runtime.Connect) > {noformat} > Instead of always issuing a CreateTopic request, the worker's admin client > should first check whether the topic exists, and if not *then* attempt to > create the topic. -- This message was sent by Atlassian Jira (v8.3.4#803005)