Repository: kafka
Updated Branches:
  refs/heads/trunk 51f7a35c9 -> 62b9fa225


KAFKA-3579; Reference both old and new consumer properties in `TopicCommand`

Add references to the new consumer property 'max.partition.fetch.bytes' along 
with the old consumer property 'fetch.message.max.bytes' in the corresponding 
warning messages of TopicCommand.
Also, create and leverage a static variable for the default value of the new 
consumer property.
Also, use 'DEFAULT_...' for default propoerty constant names in the code 
instead of '..._DEFAULT'.

Author: Vahid Hashemian <[email protected]>

Reviewers: Manikumar reddy O <[email protected]>, Ashish Singh 
<[email protected]>, Grant Henke <[email protected]>, Ismael Juma 
<[email protected]>

Closes #1239 from vahidhashemian/KAFKA-3579


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62b9fa22
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62b9fa22
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62b9fa22

Branch: refs/heads/trunk
Commit: 62b9fa22545a8e254b4ffd07ddc5bd3315542548
Parents: 51f7a35
Author: Vahid Hashemian <[email protected]>
Authored: Sun May 8 22:27:58 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Sun May 8 22:27:58 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  7 ++--
 .../internals/ConsumerCoordinatorTest.java      | 14 ++++----
 .../main/scala/kafka/admin/TopicCommand.scala   | 35 +++++++++++---------
 3 files changed, 31 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 69c4a36..6523d18 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -115,6 +115,7 @@ public class ConsumerConfig extends AbstractConfig {
      */
     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = 
"max.partition.fetch.bytes";
     private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum 
amount of data per-partition the server will return. The maximum total memory 
used for a request will be <code>#partitions * 
max.partition.fetch.bytes</code>. This size must be at least as large as the 
maximum message size the server allows or else it is possible for the producer 
to send messages larger than the consumer can fetch. If that happens, the 
consumer can get stuck trying to fetch a large message on a certain partition.";
+    public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 
1024;
 
     /** <code>send.buffer.bytes</code> */
     public static final String SEND_BUFFER_CONFIG = 
CommonClientConfigs.SEND_BUFFER_CONFIG;
@@ -184,7 +185,7 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = 
"exclude.internal.topics";
     private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records 
from internal topics (such as offsets) should be exposed to the consumer. "
                                                             + "If set to 
<code>true</code> the only way to receive records from an internal topic is 
subscribing to it.";
-    public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true;
+    public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
     
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -231,7 +232,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
                                         Type.INT,
-                                        1 * 1024 * 1024,
+                                        DEFAULT_MAX_PARTITION_FETCH_BYTES,
                                         atLeast(0),
                                         Importance.HIGH,
                                         MAX_PARTITION_FETCH_BYTES_DOC)
@@ -332,7 +333,7 @@ public class ConsumerConfig extends AbstractConfig {
                                         MAX_POLL_RECORDS_DOC)
                                 .define(EXCLUDE_INTERNAL_TOPICS_CONFIG,
                                         Type.BOOLEAN,
-                                        EXCLUDE_INTERNAL_TOPICS_DEFAULT,
+                                        DEFAULT_EXCLUDE_INTERNAL_TOPICS,
                                         Importance.MEDIUM,
                                         EXCLUDE_INTERNAL_TOPICS_DOC)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 82a854a..fc5c929 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -114,7 +114,7 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-        this.coordinator = buildCoordinator(metrics, assignors, 
ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled);
+        this.coordinator = buildCoordinator(metrics, assignors, 
ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled);
     }
 
     @After
@@ -735,7 +735,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
@@ -761,7 +761,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
@@ -789,7 +789,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignment() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 100);
@@ -807,7 +807,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testAutoCommitManualAssignmentCoordinatorUnknown() {
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), 
assignors,
-                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true);
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 100);
@@ -1096,7 +1096,7 @@ public class ConsumerCoordinatorTest {
 
         try (Metrics metrics = new Metrics(time)) {
             ConsumerCoordinator coordinator = buildCoordinator(metrics, 
Arrays.<PartitionAssignor>asList(roundRobin, range),
-                    ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
+                    ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -1105,7 +1105,7 @@ public class ConsumerCoordinatorTest {
 
         try (Metrics metrics = new Metrics(time)) {
             ConsumerCoordinator coordinator = buildCoordinator(metrics, 
Arrays.<PartitionAssignor>asList(range, roundRobin),
-                    ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
+                    ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(range.name(), metadata.get(0).name());

http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 029adea..e6ebb96 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import java.util.Properties
 import joptsimple._
 import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException}
-import kafka.consumer.{ConsumerConfig, Whitelist}
+import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist}
 import kafka.coordinator.GroupCoordinator
 import kafka.log.{Defaults, LogConfig}
 import kafka.server.ConfigType
@@ -31,6 +31,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
 import scala.collection.JavaConversions._
 import scala.collection._
+import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
 import org.apache.kafka.common.internals.TopicConstants
 
 
@@ -383,30 +384,34 @@ object TopicCommand extends Logging {
   def shortMessageSizeWarning(maxMessageBytes: Int): String = {
     "\n\n" +
       
"*****************************************************************************************************\n"
 +
-      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the consumer ***\n" +
-      "*** default. This operation is potentially dangerous. Consumers will 
get failures if their        ***\n" +
-      "*** fetch.message.max.bytes < the value you are using.                  
                          ***\n" +
+      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the broker's    ***\n" +
+      "*** default max.message.bytes. This operation is potentially dangerous. 
Consumers will get        ***\n" +
+      s"*** failures if their fetch.message.max.bytes (old consumer) or 
${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}         ***\n"+ 
+      "*** (new consumer) < the value you are using.                           
                          ***\n" +
       
"*****************************************************************************************************\n"
 +
       s"- value set here: $maxMessageBytes\n" +
-      s"- Default Consumer fetch.message.max.bytes: 
${ConsumerConfig.FetchSize}\n" +
+      s"- Default Old Consumer fetch.message.max.bytes: 
${OldConsumerConfig.FetchSize}\n" +
+      s"- Default New Consumer 
${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: 
${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n" +
       s"- Default Broker max.message.bytes: 
${kafka.server.Defaults.MessageMaxBytes}\n\n"
   }
 
   def longMessageSizeWarning(maxMessageBytes: Int): String = {
     "\n\n" +
-      
"****************************************************************************************************\n"
 +
-      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the broker      ***\n" +
-      "*** default. This operation is dangerous. There are two potential side 
effects:                  ***\n" +
-      "*** - Consumers will get failures if their fetch.message.max.bytes < 
the value you are using     ***\n" +
-      "*** - Producer requests larger than replica.fetch.max.bytes will not 
replicate and hence have    ***\n" +
-      "***   a higher risk of data loss                                        
                         ***\n" +
-      "*** You should ensure both of these settings are greater than the value 
set here before using    ***\n" +
-      "*** this topic.                                                         
                         ***\n" +
-      
"****************************************************************************************************\n"
 +
+      
"*****************************************************************************************************\n"
 +
+      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the broker's    ***\n" +
+      "*** default max.message.bytes. This operation is dangerous. There are 
two potential side effects: ***\n" +
+      "*** - Consumers will get failures if their fetch.message.max.bytes (old 
consumer) or              ***\n" +
+      s"***   ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new 
consumer) < the value you are using                          ***\n" +
+      "*** - Producer requests larger than replica.fetch.max.bytes will not 
replicate and hence have     ***\n" +
+      "***   a higher risk of data loss                                        
                          ***\n" +
+      "*** You should ensure both of these settings are greater than the value 
set here before using     ***\n" +
+      "*** this topic.                                                         
                          ***\n" +
+      
"*****************************************************************************************************\n"
 +
       s"- value set here: $maxMessageBytes\n" +
       s"- Default Broker replica.fetch.max.bytes: 
${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" +
       s"- Default Broker max.message.bytes: 
${kafka.server.Defaults.MessageMaxBytes}\n" +
-      s"- Default Consumer fetch.message.max.bytes: 
${ConsumerConfig.FetchSize}\n\n"
+      s"- Default Old Consumer fetch.message.max.bytes: 
${OldConsumerConfig.FetchSize}\n" +
+      s"- Default New Consumer 
${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: 
${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n"
   }
 }
 

Reply via email to