[ https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040545#comment-17040545 ]
Evelyn Bayes commented on KAFKA-9216: ------------------------------------- I'd be happy to do this. Looking through the code, the worker tries to recreated the log each time a worker starts: {code:java} public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) { this.lock = new Object(); this.started = false; this.converter = converter; this.offset = -1; this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); if (this.topic == null || this.topic.trim().length() == 0) throw new ConfigException("Must specify topic for connector configuration."); configLog = setupAndCreateKafkaBasedLog(this.topic, config); <- here this.configTransformer = configTransformer; } {code} The above only does prep on the worker end and it isn't until start() is called that it actually tries to "create" the topic: {code:java} public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, Runnable initializer) { this.topic = topic; this.producerConfigs = producerConfigs; this.consumerConfigs = consumerConfigs; this.consumedCallback = consumedCallback; this.stopRequested = false; this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; this.initializer = initializer != null ? initializer : new Runnable() { @Override public void run() { } }; } public void start() { <- here log.info("Starting KafkaBasedLog with topic " + topic); initializer.run(); <- here producer = createProducer(); consumer = createConsumer(); {code} And the worker just fires and forgets: {code:java} private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback, final NewTopic topicDescription, final Map<String, Object> adminProps) { Runnable createTopics = new Runnable() { @Override public void run() { log.debug("Creating admin client to manage Connect internal config topic"); try (TopicAdmin admin = new TopicAdmin(adminProps)) { admin.createTopics(topicDescription); } } }; return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics); } {code} This means that it doesn't seem to expose the number of partitions anywhere, except in the utility class TopicAdmin which is used by the offset backing store and the config backing store. I see three options: * Create a new temporary clients and get the metadata to do a check as part of the starting process in KafkaConfigBackingStore; * Modify TopicAdmin which is a utility client used to create the topic for KafkaConfigBackingStore and KafkaOffsetBackingStore but this doesn't seem to get that metadata at the moment. However, we could get it to return details on the topic; or * Modify KafkaBasedLog with a new method to expose the partition count and create a private variable to retain that information. For the second and third option we'd pass the partition information to KafkaConfigBackingStore and run the check there > Enforce connect internal topic configuration at startup > ------------------------------------------------------- > > Key: KAFKA-9216 > URL: https://issues.apache.org/jira/browse/KAFKA-9216 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 0.11.0.0 > Reporter: Randall Hauch > Priority: Major > > Users sometimes configure Connect's internal topic for configurations with > more than one partition. One partition is expected, however, and using more > than one leads to weird behavior that is sometimes not easy to spot. > Here's one example of a log message: > {noformat} > "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, > groupId=td-connect-server] Current config state offset 284 does not match > group assignment 274. Forcing rebalance. > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n" > {noformat} > Would it be possible to add a check in the KafkaConfigBackingStore and > prevent the worker from starting if connect config partition count !=1 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)