Repository: kafka
Updated Branches:
  refs/heads/trunk 403158b54 -> 44f6c4b94


KAFKA-2338; Warn on max.message.bytes change

- Both TopicCommand and ConfigCommand warn if message.max.bytes increases
- Log failures on the broker if replication gets stuck due to an oversized 
message
- Added blocking call to warning.

Author: benstopford <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #322 from benstopford/CPKAFKA-61


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44f6c4b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44f6c4b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44f6c4b9

Branch: refs/heads/trunk
Commit: 44f6c4b946511ce4663d41bf40f2960d2faee198
Parents: 403158b
Author: benstopford <[email protected]>
Authored: Tue Oct 20 17:21:46 2015 -0700
Committer: Jun Rao <[email protected]>
Committed: Tue Oct 20 17:21:46 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/ConfigCommand.scala  | 16 +++++-
 .../main/scala/kafka/admin/TopicCommand.scala   | 56 +++++++++++++++++++-
 .../kafka/server/ReplicaFetcherThread.scala     |  9 ++++
 3 files changed, 78 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ba4c003..a6984be 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -19,7 +19,9 @@ package kafka.admin
 
 import joptsimple._
 import java.util.Properties
-import kafka.log.LogConfig
+import kafka.admin.TopicCommand._
+import kafka.consumer.ConsumerConfig
+import kafka.log.{Defaults, LogConfig}
 import kafka.server.ConfigType
 import kafka.utils.{ZkUtils, CommandLineUtils}
 import org.I0Itec.zkclient.ZkClient
@@ -67,6 +69,7 @@ object ConfigCommand {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entityType = opts.options.valueOf(opts.entityType)
     val entityName = opts.options.valueOf(opts.entityName)
+    warnOnMaxMessagesChange(configsToBeAdded)
 
     // compile the final set of configs
     val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
@@ -82,6 +85,17 @@ object ConfigCommand {
     }
   }
 
+  def warnOnMaxMessagesChange(configs: Properties): Unit = {
+    val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
+      case n: String => n.toInt
+      case _ => -1
+    }
+    if (maxMessageBytes > Defaults.MaxMessageSize){
+      error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
+      TopicCommand.askToProceed
+    }
+  }
+
   private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
     val entityType = opts.options.valueOf(opts.entityType)
     val entityNames: Seq[String] =

http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9fe2606..e6ca112 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -27,8 +27,8 @@ import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
 import scala.collection.JavaConversions._
-import kafka.log.LogConfig
-import kafka.consumer.Whitelist
+import kafka.log.{Defaults, LogConfig}
+import kafka.consumer.{ConsumerConfig, Whitelist}
 import kafka.server.{ConfigType, OffsetManager}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.security.JaasUtils
@@ -96,11 +96,13 @@ object TopicCommand extends Logging {
       println("WARNING: Due to limitations in metric names, topics with a 
period ('.') or underscore ('_') could collide. To avoid issues it is best to 
use either, but not both.")
     if (opts.options.has(opts.replicaAssignmentOpt)) {
       val assignment = 
parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
+      warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, 
topic, assignment, configs, update = false)
     } else {
       CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, 
opts.partitionsOpt, opts.replicationFactorOpt)
       val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
       val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
+      warnOnMaxMessagesChange(configs, replicas)
       AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
     }
     println("Created topic \"%s\".".format(topic))
@@ -333,4 +335,54 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) + 
reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
     }
   }
+  def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = {
+    val maxMessageBytes =  configs.get(LogConfig.MaxMessageBytesProp) match {
+      case n: String => n.toInt
+      case _ => -1
+    }
+    if (maxMessageBytes > Defaults.MaxMessageSize)
+      if (replicas > 1) {
+        error(longMessageSizeWarning(maxMessageBytes))
+        askToProceed
+      }
+      else
+        warn(shortMessageSizeWarning(maxMessageBytes))
+  }
+
+  def askToProceed: Unit = {
+    println("Are you sure you want to continue? [y/n]")
+    if (!Console.readLine().equalsIgnoreCase("y")) {
+      println("Ending your session")
+      System.exit(0)
+    }
+  }
+
+  def shortMessageSizeWarning(maxMessageBytes: Int): String = {
+    "\n\n" +
+      
"*****************************************************************************************************\n"
 +
+      "*** WARNING: you are creating a topic where the the max.message.bytes 
is greater than the consumer ***\n" +
+      "*** default. This operation is potentially dangerous. Consumers will 
get failures if their        ***\n" +
+      "*** fetch.message.max.bytes < the value you are using.                  
                          ***\n" +
+      
"*****************************************************************************************************\n"
 +
+      s"- value set here: $maxMessageBytes\n" +
+      s"- Default Consumer fetch.message.max.bytes: 
${ConsumerConfig.FetchSize}\n" +
+      s"- Default Broker max.message.bytes: 
${kafka.server.Defaults.MessageMaxBytes}\n\n"
+  }
+
+  def longMessageSizeWarning(maxMessageBytes: Int): String = {
+    "\n\n" +
+      
"****************************************************************************************************\n"
 +
+      "*** WARNING: you are creating a topic where the max.message.bytes is 
greater than the broker      ***\n" +
+      "*** default. This operation is dangerous. There are two potential side 
effects:                  ***\n" +
+      "*** - Consumers will get failures if their fetch.message.max.bytes < 
the value you are using     ***\n" +
+      "*** - Producer requests larger than replica.fetch.max.bytes will not 
replicate and hence have    ***\n" +
+      "***   a higher risk of data loss                                        
                         ***\n" +
+      "*** You should ensure both of these settings are greater than the value 
set here before using    ***\n" +
+      "*** this topic.                                                         
                         ***\n" +
+      
"****************************************************************************************************\n"
 +
+      s"- value set here: $maxMessageBytes\n" +
+      s"- Default Broker replica.fetch.max.bytes: 
${kafka.server.Defaults.ReplicaFetchMaxBytes}\n" +
+      s"- Default Broker max.message.bytes: 
${kafka.server.Defaults.MessageMaxBytes}\n" +
+      s"- Default Consumer fetch.message.max.bytes: 
${ConsumerConfig.FetchSize}\n\n"
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44f6c4b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5aa817d..5993bbb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -99,6 +99,7 @@ class ReplicaFetcherThread(name: String,
       val TopicAndPartition(topic, partitionId) = topicAndPartition
       val replica = replicaMgr.getReplica(topic, partitionId).get
       val messageSet = partitionData.toByteBufferMessageSet
+      warnIfMessageOversized(messageSet)
 
       if (fetchOffset != replica.logEndOffset.messageOffset)
         throw new RuntimeException("Offset mismatch: fetched offset = %d, log 
end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
@@ -121,6 +122,14 @@ class ReplicaFetcherThread(name: String,
     }
   }
 
+  def warnIfMessageOversized(messageSet: ByteBufferMessageSet): Unit = {
+    if (messageSet.sizeInBytes > 0 && messageSet.validBytes <= 0)
+      error("Replication is failing due to a message that is greater than 
replica.fetch.max.bytes. This " +
+        "generally occurs when the max.message.bytes has been overridden to 
exceed this value and a suitably large " +
+        "message has also been sent. To fix this problem increase 
replica.fetch.max.bytes in your broker config to be " +
+        "equal or larger than your settings for max.message.bytes, both at a 
broker and topic level.")
+  }
+
   /**
    * Handle a partition whose offset is out of range and return a new fetch 
offset.
    */

Reply via email to