[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040545#comment-17040545
 ] 

Evelyn Bayes commented on KAFKA-9216:
-------------------------------------

I'd be happy to do this.

 

Looking through the code, the worker tries to recreated the log each time a 
worker starts:

 
{code:java}
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, 
WorkerConfigTransformer configTransformer) {
  this.lock = new Object();
  this.started = false;
  this.converter = converter;
  this.offset = -1;
  this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
  if (this.topic == null || this.topic.trim().length() == 0)
    throw new ConfigException("Must specify topic for connector 
configuration.");
  configLog = setupAndCreateKafkaBasedLog(this.topic, config);  <- here
  this.configTransformer = configTransformer;
}
{code}
The above only does prep on the worker end and it isn't until start() is called 
that it actually tries to "create" the topic:

 
{code:java}
public KafkaBasedLog(String topic,
                         Map<String, Object> producerConfigs,
                         Map<String, Object> consumerConfigs,
                         Callback<ConsumerRecord<K, V>> consumedCallback,
                         Time time,
                         Runnable initializer) {
        this.topic = topic;
        this.producerConfigs = producerConfigs;
        this.consumerConfigs = consumerConfigs;
        this.consumedCallback = consumedCallback;
        this.stopRequested = false;
        this.readLogEndOffsetCallbacks = new ArrayDeque<>();
        this.time = time;
        this.initializer = initializer != null ? initializer : new Runnable() {
            @Override
            public void run() {
            }
        };
    }    

public void start() {                                                <- here
        log.info("Starting KafkaBasedLog with topic " + topic);        

        initializer.run();                                           <- here
        producer = createProducer();
        consumer = createConsumer();        

{code}
And the worker just fires and forgets:
{code:java}
    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, 
Map<String, Object> producerProps,
                                                              Map<String, 
Object> consumerProps,
                                                              
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
                                                              final NewTopic 
topicDescription, final Map<String, Object> adminProps) {
        Runnable createTopics = new Runnable() {
            @Override
            public void run() {
                log.debug("Creating admin client to manage Connect internal 
config topic");
                try (TopicAdmin admin = new TopicAdmin(adminProps)) {
                    admin.createTopics(topicDescription);
                }
            }
        };
        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
consumedCallback, Time.SYSTEM, createTopics);
    }
{code}
This means that it doesn't seem to expose the number of partitions anywhere, 
except in the utility class TopicAdmin which is used by the offset backing 
store and the config backing store.

 

I see three options:
 * Create a new temporary clients and get the metadata to do a check as part of 
the starting process in KafkaConfigBackingStore;
 * Modify TopicAdmin which is a utility client used to create the topic for 
KafkaConfigBackingStore and KafkaOffsetBackingStore but this doesn't seem to 
get that metadata at the moment. However, we could get it to return details on 
the topic; or
 * Modify KafkaBasedLog with a new method to expose the partition count and 
create a private variable to retain that information.



For the second and third option we'd pass the partition information to 
KafkaConfigBackingStore and run the check there

 

 

 

 

> Enforce connect internal topic configuration at startup
> -------------------------------------------------------
>
>                 Key: KAFKA-9216
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9216
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Randall Hauch
>            Priority: Major
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to