[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672080#comment-16672080 ]
ASF GitHub Bot commented on KAFKA-7509: --------------------------------------- rhauch closed pull request #5802: KAFKA-7509: Reduce unnecessary and misleading “configuration supplied but not known” warning messages in Connect URL: https://github.com/apache/kafka/pull/5802 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index 058c491672a..ebd10d45689 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -185,6 +185,20 @@ protected AdminClientConfig(Map<?, ?> props, boolean doLog) { return CONFIG.names(); } + /** + * Return whether the given property name is a known configuration. This will consider valid any property that can be passed to + * instances of extensions, such as the {@link #METRIC_REPORTER_CLASSES_CONFIG metrics reporter}. + * + * @param name the property name + * @return true if the supplied name matches a known property, or false if it is unknown + */ + public static boolean isKnownConfig(String name) { + if (name == null) { + return false; + } + return configNames().contains(name) || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index ddd6e06c713..c62c574ad80 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -507,6 +507,22 @@ protected ConsumerConfig(Map<?, ?> props, boolean doLog) { return CONFIG.names(); } + /** + * Return whether the given property name is a known configuration. This will consider valid any property that can be passed to + * instances of extensions, such as the {@link #METRIC_REPORTER_CLASSES_CONFIG metrics reporter}. + * + * @param name the property name + * @return true if the supplied name matches a known property, or false if it is unknown + */ + public static boolean isKnownConfig(String name) { + if (name == null) { + return false; + } + return configNames().contains(name) + || name.startsWith(KEY_DESERIALIZER_CLASS_CONFIG) || name.startsWith(VALUE_DESERIALIZER_CLASS_CONFIG) + || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG) || name.startsWith(INTERCEPTOR_CLASSES_CONFIG); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 6142519c4dc..515a54888e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -392,6 +392,23 @@ public ProducerConfig(Map<String, Object> props) { return CONFIG.names(); } + /** + * Return whether the given property name is a known configuration. This will consider valid any property that can be passed to + * instances of extensions, such as the {@link #METRIC_REPORTER_CLASSES_CONFIG metrics reporter}. + * + * @param name the property name + * @return true if the supplied name matches a known property, or false if it is unknown + */ + public static boolean isKnownConfig(String name) { + if (name == null) { + return false; + } + return configNames().contains(name) + || name.startsWith(KEY_SERIALIZER_CLASS_CONFIG) || name.startsWith(VALUE_SERIALIZER_CLASS_CONFIG) + || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG) || name.startsWith(INTERCEPTOR_CLASSES_CONFIG) + || name.startsWith(PARTITIONER_CLASS_CONFIG); + } + public static void main(String[] args) { System.out.println(CONFIG.toHtmlTable()); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index df73a434d31..85a8d320d0c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -49,6 +49,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 23122699783..37b301e0d2c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -17,9 +17,11 @@ package org.apache.kafka.connect.runtime.errors; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.header.Headers; @@ -27,6 +29,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.SinkConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +80,8 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, ErrorHandlingMetrics errorHandlingMetrics) { String topic = sinkConfig.dlqTopicName(); - try (AdminClient admin = AdminClient.create(workerConfig.originals())) { + Map<String, Object> adminConfig = ConnectUtils.retainConfigs(workerConfig.originals(), AdminClientConfig::isKnownConfig); + try (AdminClient admin = AdminClient.create(adminConfig)) { if (!admin.listTopics().names().get().contains(topic)) { log.error("Topic {} doesn't exist. Will attempt to create topic.", topic); NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor()); @@ -91,6 +95,7 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, } } + ConnectUtils.retainConfigs(producerProps, ProducerConfig::isKnownConfig); KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, errorHandlingMetrics); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index e7ee632638d..2c1c8e19d4f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -39,6 +40,7 @@ import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; @@ -416,9 +418,14 @@ public void putTargetState(String connector, TargetState state) { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + // Prevent logging unused config warnings + ConnectUtils.retainConfigs(producerProps, ProducerConfig::isKnownConfig); + Map<String, Object> consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // Prevent logging unused config warnings + ConnectUtils.retainConfigs(consumerProps, ConsumerConfig::isKnownConfig); Map<String, Object> adminProps = new HashMap<>(originals); NewTopic topicDescription = TopicAdmin.defineTopic(topic). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java index 195c498edb7..b2833ae384f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -28,6 +29,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConvertingFutureCallback; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.TopicAdmin; @@ -72,10 +74,14 @@ public void configure(final WorkerConfig config) { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); + // Prevent logging unused config warnings + ConnectUtils.retainConfigs(producerProps, ProducerConfig::isKnownConfig); Map<String, Object> consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // Prevent logging unused config warnings + ConnectUtils.retainConfigs(consumerProps, ConsumerConfig::isKnownConfig); Map<String, Object> adminProps = new HashMap<>(originals); NewTopic topicDescription = TopicAdmin.defineTopic(topic). diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index 6710808f9a9..4afeaed7401 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -39,6 +40,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.KafkaBasedLog; import org.apache.kafka.connect.util.Table; @@ -129,10 +131,14 @@ public void configure(final WorkerConfig config) { producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class + // Prevent logging unused config warnings + ConnectUtils.retainConfigs(producerProps, ProducerConfig::isKnownConfig); Map<String, Object> consumerProps = new HashMap<>(originals); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + // Prevent logging unused config warnings + ConnectUtils.retainConfigs(consumerProps, ConsumerConfig::isKnownConfig); Map<String, Object> adminProps = new HashMap<>(originals); NewTopic topicDescription = TopicAdmin.defineTopic(topic). @@ -141,12 +147,7 @@ public void configure(final WorkerConfig config) { replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)). build(); - Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() { - @Override - public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { - read(record); - } - }; + Callback<ConsumerRecord<String, byte[]>> readCallback = (error, record) -> read(record); this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java index 9f30236fdee..9dbfc8ab628 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java @@ -17,6 +17,9 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; @@ -25,11 +28,34 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; public final class ConnectUtils { private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class); + protected static final String[] PRODUCER_CONFIG_PREFIXES = { + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, + ProducerConfig.PARTITIONER_CLASS_CONFIG + }; + + protected static final String[] CONSUMER_CONFIG_PREFIXES = { + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, + }; + + protected static final String[] ADMIN_CONFIG_PREFIXES = { + AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + }; + public static Long checkAndConvertTimestamp(Long timestamp) { if (timestamp == null || timestamp >= 0) return timestamp; @@ -65,4 +91,26 @@ static String lookupKafkaClusterId(AdminClient adminClient) { + "Check worker's broker connection and security properties.", e); } } + + /** + * Modify the supplied map of configurations to retain only those configuration name-value pairs that satisfy the supplied predicate. + * + * @param configs the map of configurations to be modified; may not be null + * @param isValid a function that is used to determine which configuration properties to retain; may not be null + * @return the supplied {@code configs} parameter, returned for convenience + * @see ProducerConfig#isKnownConfig(String) + * @see ConsumerConfig#isKnownConfig(String) + * @see AdminClientConfig#isKnownConfig(String) + */ + public static Map<String, Object> retainConfigs(Map<String, Object> configs, Predicate<String> isValid) { + Iterator<Entry<String, Object>> entryIter = configs.entrySet().iterator(); + while (entryIter.hasNext()) { + Map.Entry<String, Object> entry = entryIter.next(); + if (!isValid.test(entry.getKey())) { + log.debug("Not retaining the '{}' config property when passing to a subcomponent", entry.getKey()); + entryIter.remove(); + } + } + return configs; + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index ad21561baf2..b8fcd256fcb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -165,7 +165,8 @@ public static NewTopicBuilder defineTopic(String topicName) { * @param adminConfig the configuration for the {@link AdminClient} */ public TopicAdmin(Map<String, Object> adminConfig) { - this(adminConfig, AdminClient.create(adminConfig)); + // Prevent logging unused config warnings + this(adminConfig, AdminClient.create(ConnectUtils.retainConfigs(adminConfig, AdminClientConfig::isKnownConfig))); } // visible for testing diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java index 6be3525380b..c14f126a527 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java @@ -16,16 +16,22 @@ */ package org.apache.kafka.connect.util; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.Node; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter; import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ConnectUtilsTest { @@ -60,4 +66,33 @@ public void testLookupKafkaClusterIdTimeout() { ConnectUtils.lookupKafkaClusterId(adminClient); } + @Test + public void removeNonAdminClientConfigurations() { + Map<String, Object> configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap1"); + configs.put(AdminClientConfig.CLIENT_ID_CONFIG, "clientId"); + configs.put(AdminClientConfig.RETRIES_CONFIG, "1"); + configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100"); + configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom", "customValue"); + configs.put("some.other.property", "value"); + configs.put("other.property", "value2"); + Map<String, Object> filtered = ConnectUtils.retainConfigs(new HashMap<>(configs), AdminClientConfig::isKnownConfig); + assertEquals(configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + filtered.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(configs.get(AdminClientConfig.CLIENT_ID_CONFIG), + filtered.get(AdminClientConfig.CLIENT_ID_CONFIG)); + assertEquals(configs.get(AdminClientConfig.RETRIES_CONFIG), + filtered.get(AdminClientConfig.RETRIES_CONFIG)); + assertEquals(configs.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), + filtered.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG)); + assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG), + filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG)); + assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom"), + filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom")); + assertFalse(filtered.containsKey("some.other.property")); + assertFalse(filtered.containsKey("other.property")); + assertEquals(configs.size() - 2, filtered.size()); + assertTrue(configs.keySet().containsAll(filtered.keySet())); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect logs unnecessary warnings about unused configurations > ------------------------------------------------------------------- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 0.10.2.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)