[GitHub] [kafka] kkonstantine commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
kkonstantine commented on a change in pull request #8654: URL: https://github.com/apache/kafka/pull/8654#discussion_r428443080 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the creation of internal topics. + */ +@Category(IntegrationTest.class) +public class InternalTopicsIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); + +private EmbeddedConnectCluster.Builder connectBuilder; +private EmbeddedConnectCluster connect; +Map workerProps = new HashMap<>(); +Properties brokerProps = new Properties(); + +@Before +public void setup() { +// setup Kafka broker properties +brokerProps.put("auto.create.topics.enable", String.valueOf(false)); + +// build a Connect cluster backed by Kafka and Zk +connectBuilder = new EmbeddedConnectCluster.Builder() +.name("connect-cluster") +.numWorkers(1) +.numBrokers(1) +.brokerProps(brokerProps); +} + +@After +public void close() { +// stop all Connect, Kafka and Zk threads. +connect.stop(); +} + +@Test +public void testCreateInternalTopicsWithDefaultSettings() throws InterruptedException { +int numWorkers = 1; +int numBrokers = 3; +connect = new EmbeddedConnectCluster.Builder().name("connect-cluster-1") Review comment: Makes sense. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -453,11 +453,17 @@ public void putSessionKey(SessionKey sessionKey) { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); Map adminProps = new HashMap<>(originals); -NewTopic topicDescription = TopicAdmin.defineTopic(topic). -compacted(). -partitions(1). - replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)). -build(); + +Map topicSettings = null; +if (config instanceof DistributedConfig) { +topicSettings = ((DistributedConfig) config).configStorageTopicSettings(); +} Review comment: ```suggestion Map topicSettings = config instanceof DistributedConfig ? ((DistributedConfig) config).configStorageTopicSettings() : Collections.emptyMap(); ``` I know that `TopicAdmin#defineTopic` checks for `null`, but I think using `null` with collections is better to do when such optimization matters. Wdyt? (btw you don't have to use the ternary operator, I just added it to make the suggestion clear). Also, if you change here, please change in the other files too. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ## @@ -400,6 +424,33 @@ public KeyGenerator getInternalRequestKeyGenerator() { } } +private Map topicSettings(String prefix) { +Map result = originalsWithPrefix(prefix); +if (CONFIG_STORAGE_PREFIX.equals(prefix) && result.containsKey(PARTITIONS_SUFFIX)) { +log.warn("Ignoring '{}{}={}' setting, since config topic partitions is always 1", prefix, PARTITIONS_SUFFIX, result.get("partitions")); +} +Object removedPolicy = result.remove("cleanup.policy"); Review comment: It's one more impor
[GitHub] [kafka] kkonstantine commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations
kkonstantine commented on a change in pull request #8654: URL: https://github.com/apache/kafka/pull/8654#discussion_r425899194 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ## @@ -66,25 +71,55 @@ /** * Specify the desired number of partitions for the topic. * - * @param numPartitions the desired number of partitions; must be positive + * @param numPartitions the desired number of partitions; must be positive, or -1 to + * signify using the broker's default * @return this builder to allow methods to be chained; never null */ public NewTopicBuilder partitions(int numPartitions) { +if (numPartitions == NO_PARTITIONS) { Review comment: this check seems to replicate a bit what's happening in `NewTopic`. Should we just pass the value, given that we have validations too already? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ## @@ -66,25 +71,55 @@ /** * Specify the desired number of partitions for the topic. * - * @param numPartitions the desired number of partitions; must be positive + * @param numPartitions the desired number of partitions; must be positive, or -1 to + * signify using the broker's default * @return this builder to allow methods to be chained; never null */ public NewTopicBuilder partitions(int numPartitions) { +if (numPartitions == NO_PARTITIONS) { +return defaultPartitions(); +} this.numPartitions = numPartitions; return this; } +/** + * Specify the topic's number of partition should be the broker configuration for + * {@code num.partitions}. + * + * @return this builder to allow methods to be chained; never null + */ +public NewTopicBuilder defaultPartitions() { +this.numPartitions = null; +return this; +} + /** * Specify the desired replication factor for the topic. * - * @param replicationFactor the desired replication factor; must be positive + * @param replicationFactor the desired replication factor; must be positive, or -1 to + * signify using the broker's default * @return this builder to allow methods to be chained; never null */ public NewTopicBuilder replicationFactor(short replicationFactor) { +if (replicationFactor == NO_REPLICATION_FACTOR) { Review comment: same question as above ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for the creation of internal topics. + */ +@Category(IntegrationTest.class) +public class InternalTopicsIntegrationTest { + +private static final Logger log = LoggerFactory.getLogger(InternalTopicsIntegrationTest.class); + +private EmbeddedConnectCluster.Builder connectBuilder; +private EmbeddedConnectCluster connect; +Map workerProps = new HashMap<>(); +Properties brokerProps = new Properties(); + +@Before +public void setup() { +// setup Kafka broker properties +brokerProps.put("auto.create.topics.enable", String.valueOf(false)); + +// build a Connect cluster backed by Kafka and Zk +connectBuilder = new EmbeddedConnectCluster.