Repository: kafka Updated Branches: refs/heads/trunk 403158b54 -> 44f6c4b94
KAFKA-2338; Warn on max.message.bytes change - Both TopicCommand and ConfigCommand warn if message.max.bytes increases - Log failures on the broker if replication gets stuck due to an oversized message - Added blocking call to warning. Author: benstopford <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #322 from benstopford/CPKAFKA-61 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44f6c4b9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44f6c4b9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44f6c4b9 Branch: refs/heads/trunk Commit: 44f6c4b946511ce4663d41bf40f2960d2faee198 Parents: 403158b Author: benstopford <[email protected]> Authored: Tue Oct 20 17:21:46 2015 -0700 Committer: Jun Rao <[email protected]> Committed: Tue Oct 20 17:21:46 2015 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/admin/ConfigCommand.scala | 16 +++++- .../main/scala/kafka/admin/TopicCommand.scala | 56 +++++++++++++++++++- .../kafka/server/ReplicaFetcherThread.scala | 9 ++++ 3 files changed, 78 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/admin/ConfigCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index ba4c003..a6984be 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -19,7 +19,9 @@ package kafka.admin import joptsimple._ import java.util.Properties -import kafka.log.LogConfig +import kafka.admin.TopicCommand._ +import kafka.consumer.ConsumerConfig +import kafka.log.{Defaults, LogConfig} import kafka.server.ConfigType import kafka.utils.{ZkUtils, CommandLineUtils} import org.I0Itec.zkclient.ZkClient @@ -67,6 +69,7 @@ object ConfigCommand { val configsToBeDeleted = parseConfigsToBeDeleted(opts) val entityType = opts.options.valueOf(opts.entityType) val entityName = opts.options.valueOf(opts.entityName) + warnOnMaxMessagesChange(configsToBeAdded) // compile the final set of configs val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName) @@ -82,6 +85,17 @@ object ConfigCommand { } } + def warnOnMaxMessagesChange(configs: Properties): Unit = { + val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match { + case n: String => n.toInt + case _ => -1 + } + if (maxMessageBytes > Defaults.MaxMessageSize){ + error(TopicCommand.longMessageSizeWarning(maxMessageBytes)) + TopicCommand.askToProceed + } + } + private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) { val entityType = opts.options.valueOf(opts.entityType) val entityNames: Seq[String] = http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 9fe2606..e6ca112 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -27,8 +27,8 @@ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.JavaConversions._ -import kafka.log.LogConfig -import kafka.consumer.Whitelist +import kafka.log.{Defaults, LogConfig} +import kafka.consumer.{ConsumerConfig, Whitelist} import kafka.server.{ConfigType, OffsetManager} import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.security.JaasUtils @@ -96,11 +96,13 @@ object TopicCommand extends Logging { println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.") if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) + warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false) } else { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue + warnOnMaxMessagesChange(configs, replicas) AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs) } println("Created topic \"%s\".".format(topic)) @@ -333,4 +335,54 @@ object TopicCommand extends Logging { allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt) } } + def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = { + val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match { + case n: String => n.toInt + case _ => -1 + } + if (maxMessageBytes > Defaults.MaxMessageSize) + if (replicas > 1) { + error(longMessageSizeWarning(maxMessageBytes)) + askToProceed + } + else + warn(shortMessageSizeWarning(maxMessageBytes)) + } + + def askToProceed: Unit = { + println("Are you sure you want to continue? [y/n]") + if (!Console.readLine().equalsIgnoreCase("y")) { + println("Ending your session") + System.exit(0) + } + } + + def shortMessageSizeWarning(maxMessageBytes: Int): String = { + "\n\n" + + "*****************************************************************************************************\n" + + "*** WARNING: you are creating a topic where the the max.message.bytes is greater than the consumer ***\n" + + "*** default. This operation is potentially dangerous. Consumers will get failures if their ***\n" + + "*** fetch.message.max.bytes < the value you are using. ***\n" + + "*****************************************************************************************************\n" + + s"- value set here: $maxMessageBytes\n" + + s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n" + + s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n\n" + } + + def longMessageSizeWarning(maxMessageBytes: Int): String = { + "\n\n" + + "****************************************************************************************************\n" + + "*** WARNING: you are creating a topic where the max.message.bytes is greater than the broker ***\n" + + "*** default. This operation is dangerous. There are two potential side effects: ***\n" + + "*** - Consumers will get failures if their fetch.message.max.bytes < the value you are using ***\n" + + "*** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have ***\n" + + "*** a higher risk of data loss ***\n" + + "*** You should ensure both of these settings are greater than the value set here before using ***\n" + + "*** this topic. ***\n" + + "****************************************************************************************************\n" + + s"- value set here: $maxMessageBytes\n" + + s"- Default Broker replica.fetch.max.bytes: ${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" + + s"- Default Broker max.message.bytes: ${kafka.server.Defaults.MessageMaxBytes}\n" + + s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n" + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5aa817d..5993bbb 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -99,6 +99,7 @@ class ReplicaFetcherThread(name: String, val TopicAndPartition(topic, partitionId) = topicAndPartition val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.toByteBufferMessageSet + warnIfMessageOversized(messageSet) if (fetchOffset != replica.logEndOffset.messageOffset) throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset)) @@ -121,6 +122,14 @@ class ReplicaFetcherThread(name: String, } } + def warnIfMessageOversized(messageSet: ByteBufferMessageSet): Unit = { + if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0) + error("Replication is failing due to a message that is greater than replica.fetch.max.bytes. This " + + "generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + + "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + + "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") + } + /** * Handle a partition whose offset is out of range and return a new fetch offset. */
