[ 
https://issues.apache.org/jira/browse/KAFKA-6382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329260#comment-16329260
 ] 

ASF GitHub Bot commented on KAFKA-6382:
---------------------------------------

mjsax closed pull request #4341: KAFKA-6382: Make ProducerConfig and 
ConsumerConfig constructors public
URL: https://github.com/apache/kafka/pull/4341
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index be3077ffe17..3fe58d7576a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -456,8 +456,7 @@
     public static Map<String, Object> addDeserializerToConfig(Map<String, 
Object> configs,
                                                               Deserializer<?> 
keyDeserializer,
                                                               Deserializer<?> 
valueDeserializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
-        newConfigs.putAll(configs);
+        Map<String, Object> newConfigs = new HashMap<>(configs);
         if (keyDeserializer != null)
             newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
         if (valueDeserializer != null)
@@ -477,7 +476,11 @@ public static Properties 
addDeserializerToConfig(Properties properties,
         return newProperties;
     }
 
-    ConsumerConfig(Map<?, ?> props) {
+    public ConsumerConfig(Properties props) {
+        super(CONFIG, props);
+    }
+
+    public ConsumerConfig(Map<String, Object> props) {
         super(CONFIG, props);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 14f405bca6e..4e67fe8323e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -267,7 +267,7 @@
      * @param configs   The producer configs
      *
      */
-    public KafkaProducer(Map<String, Object> configs) {
+    public KafkaProducer(final Map<String, Object> configs) {
         this(new ProducerConfig(configs), null, null, null, null);
     }
 
@@ -286,7 +286,10 @@ public KafkaProducer(Map<String, Object> configs) {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> 
keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, 
keySerializer, valueSerializer)),
-                keySerializer, valueSerializer, null, null);
+            keySerializer,
+            valueSerializer,
+            null,
+            null);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f9bf3f47723..0631814cda9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -347,7 +347,8 @@
     }
 
     public static Properties addSerializerToConfig(Properties properties,
-                                                   Serializer<?> 
keySerializer, Serializer<?> valueSerializer) {
+                                                   Serializer<?> keySerializer,
+                                                   Serializer<?> 
valueSerializer) {
         Properties newProperties = new Properties();
         newProperties.putAll(properties);
         if (keySerializer != null)
@@ -357,7 +358,11 @@ public static Properties addSerializerToConfig(Properties 
properties,
         return newProperties;
     }
 
-    ProducerConfig(Map<?, ?> props) {
+    public ProducerConfig(Properties props) {
+        super(CONFIG, props);
+    }
+
+    public ProducerConfig(Map<String, Object> props) {
         super(CONFIG, props);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b4908e85d82..0feb48ddecd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -46,6 +46,7 @@
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -765,8 +766,8 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 
getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
 
         // add admin retries configs for creating topics
-        final AdminClientConfig config = new 
AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, 
AdminClientConfig.configNames()));
-        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), 
config.getInt(AdminClientConfig.RETRIES_CONFIG));
+        final AdminClientConfig adminClientDefaultConfig = new 
AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, 
AdminClientConfig.configNames()));
+        consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), 
adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG));
 
         // verify that producer batch config is no larger than segment size, 
then add topic configs required for creating topics
         final Map<String, Object> topicProps = 
originalsWithPrefix(TOPIC_PREFIX, false);
@@ -774,7 +775,13 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         if 
(topicProps.containsKey(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG))) {
             final int segmentSize = 
Integer.parseInt(topicProps.get(topicPrefix(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG)).toString());
             final Map<String, Object> producerProps = 
getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
-            final int batchSize = 
producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? 
Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString())
 : 16384;
+            final int batchSize;
+            if (producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG)) {
+                batchSize = 
Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
+            } else {
+                final ProducerConfig producerDefaultConfig = new 
ProducerConfig(new Properties());
+                batchSize = 
producerDefaultConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG);
+            }
 
             if (segmentSize < batchSize) {
                 throw new IllegalArgumentException(String.format("Specified 
topic segment size %d is is smaller than the configured producer batch size %d, 
this will cause produced batch not able to be appended to the topic",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Make ProducerConfig and ConsumerConfig constructors public
> ----------------------------------------------------------
>
>                 Key: KAFKA-6382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6382
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Minor
>              Labels: kip
>             Fix For: 1.1.0
>
>
> Constructors of {{ProducerConfig}} and {{ConsumerConfig}} are not public atm. 
> This makes it hard to access default values for configurations. For Kafka 
> Streams, we need access to those default value sometime and are forces to 
> hardcode them via c&p atm.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-243%3A+Make+ProducerConfig+and+ConsumerConfig+constructors+public



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to