[GitHub] [kafka] kkonstantine commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-20 Thread GitBox


kkonstantine commented on a change in pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#discussion_r428443080



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the creation of internal topics.
+ */
+@Category(IntegrationTest.class)
+public class InternalTopicsIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
+
+private EmbeddedConnectCluster.Builder connectBuilder;
+private EmbeddedConnectCluster connect;
+Map workerProps = new HashMap<>();
+Properties brokerProps = new Properties();
+
+@Before
+public void setup() {
+// setup Kafka broker properties
+brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+// build a Connect cluster backed by Kafka and Zk
+connectBuilder = new EmbeddedConnectCluster.Builder()
+.name("connect-cluster")
+.numWorkers(1)
+.numBrokers(1)
+.brokerProps(brokerProps);
+}
+
+@After
+public void close() {
+// stop all Connect, Kafka and Zk threads.
+connect.stop();
+}
+
+@Test
+public void testCreateInternalTopicsWithDefaultSettings() throws 
InterruptedException {
+int numWorkers = 1;
+int numBrokers = 3;
+connect = new 
EmbeddedConnectCluster.Builder().name("connect-cluster-1")

Review comment:
   Makes sense. 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -453,11 +453,17 @@ public void putSessionKey(SessionKey sessionKey) {
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
 Map adminProps = new HashMap<>(originals);
-NewTopic topicDescription = TopicAdmin.defineTopic(topic).
-compacted().
-partitions(1).
-
replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
-build();
+
+Map topicSettings = null;
+if (config instanceof DistributedConfig) {
+topicSettings = ((DistributedConfig) 
config).configStorageTopicSettings();
+}

Review comment:
   ```suggestion
   Map topicSettings = config instanceof 
DistributedConfig
   ? ((DistributedConfig) config).configStorageTopicSettings()
   : Collections.emptyMap();
   ```
   I know that `TopicAdmin#defineTopic` checks for `null`, but I think using 
`null` with collections is better to do when such optimization matters. Wdyt? 
   (btw you don't have to use the ternary operator, I just added it to make the 
suggestion clear). 
   
   Also, if you change here, please change in the other files too. 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##
@@ -400,6 +424,33 @@ public KeyGenerator getInternalRequestKeyGenerator() {
 }
 }
 
+private Map topicSettings(String prefix) {
+Map result = originalsWithPrefix(prefix);
+if (CONFIG_STORAGE_PREFIX.equals(prefix) && 
result.containsKey(PARTITIONS_SUFFIX)) {
+log.warn("Ignoring '{}{}={}' setting, since config topic 
partitions is always 1", prefix, PARTITIONS_SUFFIX, result.get("partitions"));
+}
+Object removedPolicy = result.remove("cleanup.policy");

Review comment:
   It's one more impor

[GitHub] [kafka] kkonstantine commented on a change in pull request #8654: KAFKA-9931: Implement KIP-605 to expand support for Connect worker internal topic configurations

2020-05-15 Thread GitBox


kkonstantine commented on a change in pull request #8654:
URL: https://github.com/apache/kafka/pull/8654#discussion_r425899194



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##
@@ -66,25 +71,55 @@
 /**
  * Specify the desired number of partitions for the topic.
  *
- * @param numPartitions the desired number of partitions; must be 
positive
+ * @param numPartitions the desired number of partitions; must be 
positive, or -1 to
+ *  signify using the broker's default
  * @return this builder to allow methods to be chained; never null
  */
 public NewTopicBuilder partitions(int numPartitions) {
+if (numPartitions == NO_PARTITIONS) {

Review comment:
   this check seems to replicate a bit what's happening in `NewTopic`. 
Should we just pass the value, given that we have validations too already?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##
@@ -66,25 +71,55 @@
 /**
  * Specify the desired number of partitions for the topic.
  *
- * @param numPartitions the desired number of partitions; must be 
positive
+ * @param numPartitions the desired number of partitions; must be 
positive, or -1 to
+ *  signify using the broker's default
  * @return this builder to allow methods to be chained; never null
  */
 public NewTopicBuilder partitions(int numPartitions) {
+if (numPartitions == NO_PARTITIONS) {
+return defaultPartitions();
+}
 this.numPartitions = numPartitions;
 return this;
 }
 
+/**
+ * Specify the topic's number of partition should be the broker 
configuration for
+ * {@code num.partitions}.
+ *
+ * @return this builder to allow methods to be chained; never null
+ */
+public NewTopicBuilder defaultPartitions() {
+this.numPartitions = null;
+return this;
+}
+
 /**
  * Specify the desired replication factor for the topic.
  *
- * @param replicationFactor the desired replication factor; must be 
positive
+ * @param replicationFactor the desired replication factor; must be 
positive, or -1 to
+ *  signify using the broker's default
  * @return this builder to allow methods to be chained; never null
  */
 public NewTopicBuilder replicationFactor(short replicationFactor) {
+if (replicationFactor == NO_REPLICATION_FACTOR) {

Review comment:
   same question as above

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for the creation of internal topics.
+ */
+@Category(IntegrationTest.class)
+public class InternalTopicsIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(InternalTopicsIntegrationTest.class);
+
+private EmbeddedConnectCluster.Builder connectBuilder;
+private EmbeddedConnectCluster connect;
+Map workerProps = new HashMap<>();
+Properties brokerProps = new Properties();
+
+@Before
+public void setup() {
+// setup Kafka broker properties
+brokerProps.put("auto.create.topics.enable", String.valueOf(false));
+
+// build a Connect cluster backed by Kafka and Zk
+connectBuilder = new EmbeddedConnectCluster.