Repository: kafka Updated Branches: refs/heads/trunk 51f7a35c9 -> 62b9fa225
KAFKA-3579; Reference both old and new consumer properties in `TopicCommand` Add references to the new consumer property 'max.partition.fetch.bytes' along with the old consumer property 'fetch.message.max.bytes' in the corresponding warning messages of TopicCommand. Also, create and leverage a static variable for the default value of the new consumer property. Also, use 'DEFAULT_...' for default propoerty constant names in the code instead of '..._DEFAULT'. Author: Vahid Hashemian <[email protected]> Reviewers: Manikumar reddy O <[email protected]>, Ashish Singh <[email protected]>, Grant Henke <[email protected]>, Ismael Juma <[email protected]> Closes #1239 from vahidhashemian/KAFKA-3579 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62b9fa22 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62b9fa22 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62b9fa22 Branch: refs/heads/trunk Commit: 62b9fa22545a8e254b4ffd07ddc5bd3315542548 Parents: 51f7a35 Author: Vahid Hashemian <[email protected]> Authored: Sun May 8 22:27:58 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Sun May 8 22:27:58 2016 +0100 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 7 ++-- .../internals/ConsumerCoordinatorTest.java | 14 ++++---- .../main/scala/kafka/admin/TopicCommand.scala | 35 +++++++++++--------- 3 files changed, 31 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 69c4a36..6523d18 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -115,6 +115,7 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes"; private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition."; + public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024; /** <code>send.buffer.bytes</code> */ public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; @@ -184,7 +185,7 @@ public class ConsumerConfig extends AbstractConfig { public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics"; private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. " + "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it."; - public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true; + public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, @@ -231,7 +232,7 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.CLIENT_ID_DOC) .define(MAX_PARTITION_FETCH_BYTES_CONFIG, Type.INT, - 1 * 1024 * 1024, + DEFAULT_MAX_PARTITION_FETCH_BYTES, atLeast(0), Importance.HIGH, MAX_PARTITION_FETCH_BYTES_DOC) @@ -332,7 +333,7 @@ public class ConsumerConfig extends AbstractConfig { MAX_POLL_RECORDS_DOC) .define(EXCLUDE_INTERNAL_TOPICS_CONFIG, Type.BOOLEAN, - EXCLUDE_INTERNAL_TOPICS_DEFAULT, + DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 82a854a..fc5c929 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -114,7 +114,7 @@ public class ConsumerCoordinatorTest { this.partitionAssignor.clear(); client.setNode(node); - this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled); + this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled); } @After @@ -735,7 +735,7 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); subscriptions.needReassignment(); @@ -761,7 +761,7 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); subscriptions.needReassignment(); @@ -789,7 +789,7 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 100); @@ -807,7 +807,7 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignmentCoordinatorUnknown() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.seek(tp, 100); @@ -1096,7 +1096,7 @@ public class ConsumerCoordinatorTest { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); List<ProtocolMetadata> metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(roundRobin.name(), metadata.get(0).name()); @@ -1105,7 +1105,7 @@ public class ConsumerCoordinatorTest { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), - ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); List<ProtocolMetadata> metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(range.name(), metadata.get(0).name()); http://git-wip-us.apache.org/repos/asf/kafka/blob/62b9fa22/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 029adea..e6ebb96 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -20,7 +20,7 @@ package kafka.admin import java.util.Properties import joptsimple._ import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException} -import kafka.consumer.{ConsumerConfig, Whitelist} +import kafka.consumer.{ConsumerConfig => OldConsumerConfig, Whitelist} import kafka.coordinator.GroupCoordinator import kafka.log.{Defaults, LogConfig} import kafka.server.ConfigType @@ -31,6 +31,7 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils import scala.collection.JavaConversions._ import scala.collection._ +import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig} import org.apache.kafka.common.internals.TopicConstants @@ -383,30 +384,34 @@ object TopicCommand extends Logging { def shortMessageSizeWarning(maxMessageBytes: Int): String = { "\n\n" + "*****************************************************************************************************\n" + - "*** WARNING: you are creating a topic where the max.message.bytes is greater than the consumer ***\n" + - "*** default. This operation is potentially dangerous. Consumers will get failures if their ***\n" + - "*** fetch.message.max.bytes < the value you are using. ***\n" + + "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's ***\n" + + "*** default max.message.bytes. This operation is potentially dangerous. Consumers will get ***\n" + + s"*** failures if their fetch.message.max.bytes (old consumer) or ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} ***\n"+ + "*** (new consumer) < the value you are using. ***\n" + "*****************************************************************************************************\n" + s"- value set here: $maxMessageBytes\n" + - s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n" + + s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" + + s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n" + s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n" } def longMessageSizeWarning(maxMessageBytes: Int): String = { "\n\n" + - "****************************************************************************************************\n" + - "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker ***\n" + - "*** default. This operation is dangerous. There are two potential side effects: ***\n" + - "*** - Consumers will get failures if their fetch.message.max.bytes < the value you are using ***\n" + - "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" + - "*** a higher risk of data loss ***\n" + - "*** You should ensure both of these settings are greater than the value set here before using ***\n" + - "*** this topic. ***\n" + - "****************************************************************************************************\n" + + "*****************************************************************************************************\n" + + "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's ***\n" + + "*** default max.message.bytes. This operation is dangerous. There are two potential side effects: ***\n" + + "*** - Consumers will get failures if their fetch.message.max.bytes (old consumer) or ***\n" + + s"*** ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG} (new consumer) < the value you are using ***\n" + + "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" + + "*** a higher risk of data loss ***\n" + + "*** You should ensure both of these settings are greater than the value set here before using ***\n" + + "*** this topic. ***\n" + + "*****************************************************************************************************\n" + s"- value set here: $maxMessageBytes\n" + s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" + s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" + - s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n" + s"- Default Old Consumer fetch.message.max.bytes: ${OldConsumerConfig.FetchSize}\n" + + s"- Default New Consumer ${NewConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG}: ${NewConsumerConfig.DEFAULT_MAX_PARTITION_FETCH_BYTES}\n\n" } }
