[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672080#comment-16672080
 ] 

ASF GitHub Bot commented on KAFKA-7509:
---------------------------------------

rhauch closed pull request #5802: KAFKA-7509: Reduce unnecessary and misleading 
“configuration supplied but not known” warning messages in Connect
URL: https://github.com/apache/kafka/pull/5802
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 058c491672a..ebd10d45689 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -185,6 +185,20 @@ protected AdminClientConfig(Map<?, ?> props, boolean 
doLog) {
         return CONFIG.names();
     }
 
+    /**
+     * Return whether the given property name is a known configuration. This 
will consider valid any property that can be passed to
+     * instances of extensions, such as the {@link 
#METRIC_REPORTER_CLASSES_CONFIG metrics reporter}.
+     *
+     * @param name the property name
+     * @return true if the supplied name matches a known property, or false if 
it is unknown
+     */
+    public static boolean isKnownConfig(String name) {
+        if (name == null) {
+            return false;
+        }
+        return configNames().contains(name) || 
name.startsWith(METRIC_REPORTER_CLASSES_CONFIG);
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ddd6e06c713..c62c574ad80 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -507,6 +507,22 @@ protected ConsumerConfig(Map<?, ?> props, boolean doLog) {
         return CONFIG.names();
     }
 
+    /**
+     * Return whether the given property name is a known configuration. This 
will consider valid any property that can be passed to
+     * instances of extensions, such as the {@link 
#METRIC_REPORTER_CLASSES_CONFIG metrics reporter}.
+     *
+     * @param name the property name
+     * @return true if the supplied name matches a known property, or false if 
it is unknown
+     */
+    public static boolean isKnownConfig(String name) {
+        if (name == null) {
+            return false;
+        }
+        return configNames().contains(name)
+               || name.startsWith(KEY_DESERIALIZER_CLASS_CONFIG) || 
name.startsWith(VALUE_DESERIALIZER_CLASS_CONFIG)
+               || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG) || 
name.startsWith(INTERCEPTOR_CLASSES_CONFIG);
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6142519c4dc..515a54888e2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -392,6 +392,23 @@ public ProducerConfig(Map<String, Object> props) {
         return CONFIG.names();
     }
 
+    /**
+     * Return whether the given property name is a known configuration. This 
will consider valid any property that can be passed to
+     * instances of extensions, such as the {@link 
#METRIC_REPORTER_CLASSES_CONFIG metrics reporter}.
+     *
+     * @param name the property name
+     * @return true if the supplied name matches a known property, or false if 
it is unknown
+     */
+    public static boolean isKnownConfig(String name) {
+        if (name == null) {
+            return false;
+        }
+        return configNames().contains(name)
+               || name.startsWith(KEY_SERIALIZER_CLASS_CONFIG) || 
name.startsWith(VALUE_SERIALIZER_CLASS_CONFIG)
+               || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG) || 
name.startsWith(INTERCEPTOR_CLASSES_CONFIG)
+               || name.startsWith(PARTITIONER_CLASS_CONFIG);
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG.toHtmlTable());
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index df73a434d31..85a8d320d0c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -49,6 +49,7 @@
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 23122699783..37b301e0d2c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.connect.runtime.errors;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.header.Headers;
@@ -27,6 +29,7 @@
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,7 +80,8 @@ public static DeadLetterQueueReporter 
createAndSetup(WorkerConfig workerConfig,
                                                          ErrorHandlingMetrics 
errorHandlingMetrics) {
         String topic = sinkConfig.dlqTopicName();
 
-        try (AdminClient admin = AdminClient.create(workerConfig.originals())) 
{
+        Map<String, Object> adminConfig = 
ConnectUtils.retainConfigs(workerConfig.originals(), 
AdminClientConfig::isKnownConfig);
+        try (AdminClient admin = AdminClient.create(adminConfig)) {
             if (!admin.listTopics().names().get().contains(topic)) {
                 log.error("Topic {} doesn't exist. Will attempt to create 
topic.", topic);
                 NewTopic schemaTopicRequest = new NewTopic(topic, 
DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
@@ -91,6 +95,7 @@ public static DeadLetterQueueReporter 
createAndSetup(WorkerConfig workerConfig,
             }
         }
 
+        ConnectUtils.retainConfigs(producerProps, 
ProducerConfig::isKnownConfig);
         KafkaProducer<byte[], byte[]> dlqProducer = new 
KafkaProducer<>(producerProps);
         return new DeadLetterQueueReporter(dlqProducer, sinkConfig, id, 
errorHandlingMetrics);
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index e7ee632638d..2c1c8e19d4f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -39,6 +40,7 @@
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
@@ -416,9 +418,14 @@ public void putTargetState(String connector, TargetState 
state) {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        // Prevent logging unused config warnings
+        ConnectUtils.retainConfigs(producerProps, 
ProducerConfig::isKnownConfig);
+
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        // Prevent logging unused config warnings
+        ConnectUtils.retainConfigs(consumerProps, 
ConsumerConfig::isKnownConfig);
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         NewTopic topicDescription = TopicAdmin.defineTopic(topic).
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 195c498edb7..b2833ae384f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -28,6 +29,7 @@
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConvertingFutureCallback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
@@ -72,10 +74,14 @@ public void configure(final WorkerConfig config) {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
+        // Prevent logging unused config warnings
+        ConnectUtils.retainConfigs(producerProps, 
ProducerConfig::isKnownConfig);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        // Prevent logging unused config warnings
+        ConnectUtils.retainConfigs(consumerProps, 
ConsumerConfig::isKnownConfig);
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         NewTopic topicDescription = TopicAdmin.defineTopic(topic).
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 6710808f9a9..4afeaed7401 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -39,6 +40,7 @@
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.Table;
@@ -129,10 +131,14 @@ public void configure(final WorkerConfig config) {
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle 
retries in this class
+        // Prevent logging unused config warnings
+        ConnectUtils.retainConfigs(producerProps, 
ProducerConfig::isKnownConfig);
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        // Prevent logging unused config warnings
+        ConnectUtils.retainConfigs(consumerProps, 
ConsumerConfig::isKnownConfig);
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         NewTopic topicDescription = TopicAdmin.defineTopic(topic).
@@ -141,12 +147,7 @@ public void configure(final WorkerConfig config) {
                 
replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)).
                 build();
 
-        Callback<ConsumerRecord<String, byte[]>> readCallback = new 
Callback<ConsumerRecord<String, byte[]>>() {
-            @Override
-            public void onCompletion(Throwable error, ConsumerRecord<String, 
byte[]> record) {
-                read(record);
-            }
-        };
+        Callback<ConsumerRecord<String, byte[]>> readCallback = (error, 
record) -> read(record);
         this.kafkaLog = createKafkaBasedLog(topic, producerProps, 
consumerProps, readCallback, topicDescription, adminProps);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 9f30236fdee..9dbfc8ab628 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
@@ -25,11 +28,34 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
 
 public final class ConnectUtils {
     private static final Logger log = 
LoggerFactory.getLogger(ConnectUtils.class);
 
+    protected static final String[] PRODUCER_CONFIG_PREFIXES = {
+            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+            ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            ProducerConfig.PARTITIONER_CLASS_CONFIG
+    };
+
+    protected static final String[] CONSUMER_CONFIG_PREFIXES = {
+            ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            };
+
+    protected static final String[] ADMIN_CONFIG_PREFIXES = {
+            AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG
+    };
+
     public static Long checkAndConvertTimestamp(Long timestamp) {
         if (timestamp == null || timestamp >= 0)
             return timestamp;
@@ -65,4 +91,26 @@ static String lookupKafkaClusterId(AdminClient adminClient) {
                                        + "Check worker's broker connection and 
security properties.", e);
         }
     }
+
+    /**
+     * Modify the supplied map of configurations to retain only those 
configuration name-value pairs that satisfy the supplied predicate.
+     *
+     * @param configs the map of configurations to be modified; may not be null
+     * @param isValid a function that is used to determine which configuration 
properties to retain; may not be null
+     * @return the supplied {@code configs} parameter, returned for convenience
+     * @see ProducerConfig#isKnownConfig(String)
+     * @see ConsumerConfig#isKnownConfig(String)
+     * @see AdminClientConfig#isKnownConfig(String)
+     */
+    public static Map<String, Object> retainConfigs(Map<String, Object> 
configs, Predicate<String> isValid) {
+        Iterator<Entry<String, Object>> entryIter = 
configs.entrySet().iterator();
+        while (entryIter.hasNext()) {
+            Map.Entry<String, Object> entry = entryIter.next();
+            if (!isValid.test(entry.getKey())) {
+                log.debug("Not retaining the '{}' config property when passing 
to a subcomponent", entry.getKey());
+                entryIter.remove();
+            }
+        }
+        return configs;
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index ad21561baf2..b8fcd256fcb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -165,7 +165,8 @@ public static NewTopicBuilder defineTopic(String topicName) 
{
      * @param adminConfig the configuration for the {@link AdminClient}
      */
     public TopicAdmin(Map<String, Object> adminConfig) {
-        this(adminConfig, AdminClient.create(adminConfig));
+        // Prevent logging unused config warnings
+        this(adminConfig, 
AdminClient.create(ConnectUtils.retainConfigs(adminConfig, 
AdminClientConfig::isKnownConfig)));
     }
 
     // visible for testing
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
index 6be3525380b..c14f126a527 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -16,16 +16,22 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class ConnectUtilsTest {
 
@@ -60,4 +66,33 @@ public void testLookupKafkaClusterIdTimeout() {
         ConnectUtils.lookupKafkaClusterId(adminClient);
     }
 
+    @Test
+    public void removeNonAdminClientConfigurations() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap1");
+        configs.put(AdminClientConfig.CLIENT_ID_CONFIG, "clientId");
+        configs.put(AdminClientConfig.RETRIES_CONFIG, "1");
+        configs.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100");
+        configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        configs.put(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + 
".custom", "customValue");
+        configs.put("some.other.property", "value");
+        configs.put("other.property", "value2");
+        Map<String, Object> filtered = ConnectUtils.retainConfigs(new 
HashMap<>(configs), AdminClientConfig::isKnownConfig);
+        assertEquals(configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                     filtered.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+        assertEquals(configs.get(AdminClientConfig.CLIENT_ID_CONFIG),
+                     filtered.get(AdminClientConfig.CLIENT_ID_CONFIG));
+        assertEquals(configs.get(AdminClientConfig.RETRIES_CONFIG),
+                     filtered.get(AdminClientConfig.RETRIES_CONFIG));
+        assertEquals(configs.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG),
+                     
filtered.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG));
+        
assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG),
+                     
filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG));
+        
assertEquals(configs.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + 
".custom"),
+                     
filtered.get(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG + ".custom"));
+        assertFalse(filtered.containsKey("some.other.property"));
+        assertFalse(filtered.containsKey("other.property"));
+        assertEquals(configs.size() - 2, filtered.size());
+        assertTrue(configs.keySet().containsAll(filtered.keySet()));
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect logs unnecessary warnings about unused configurations
> -------------------------------------------------------------------
>
>                 Key: KAFKA-7509
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7509
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to