[ https://issues.apache.org/jira/browse/KAFKA-6973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498044#comment-16498044 ]
ASF GitHub Bot commented on KAFKA-6973: --------------------------------------- ijuma closed pull request #5106: KAFKA-6973: TopicCommand should verify topic-level config URL: https://github.com/apache/kafka/pull/5106 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 0db49e76e2e..158209a1fc0 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -254,7 +254,7 @@ object LogConfig { KafkaConfig.LogPreAllocateProp) .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc, KafkaConfig.LogMessageFormatVersionProp) - .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, MEDIUM, MessageTimestampTypeDoc, + .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc, KafkaConfig.LogMessageTimestampTypeProp) .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc, KafkaConfig.LogMessageTimestampDifferenceMaxMsProp) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 6a276df1e7c..782fcf539f3 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -225,4 +225,21 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT assertTrue(output.contains(topic) && output.contains(markedForDeletionList)) } + @Test + def testInvalidTopicLevelConfig(): Unit = { + val brokers = List(0) + TestUtils.createBrokersInZk(zkClient, brokers) + + // create the topic + try { + val createOpts = new TopicCommandOptions( + Array("--partitions", "1", "--replication-factor", "1", "--topic", "test", + "--config", "message.timestamp.type=boom")) + TopicCommand.createTopic(zkClient, createOpts) + fail("Expected exception on invalid topic-level config.") + } catch { + case _: Exception => // topic creation should fail due to the invalid config + } + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > setting invalid timestamp causes Kafka broker restart to fail > ------------------------------------------------------------- > > Key: KAFKA-6973 > URL: https://issues.apache.org/jira/browse/KAFKA-6973 > Project: Kafka > Issue Type: Bug > Components: admin > Affects Versions: 1.1.0 > Reporter: Paul Brebner > Assignee: huxihx > Priority: Critical > > Setting timestamp to invalid value causes Kafka broker to fail upon startup. > E.g. > ./kafka-topics.sh --create --zookeeper localhost --topic duck3 --partitions 1 > --replication-factor 1 --config message.timestamp.type=boom > > Also note that the docs says the parameter name is > log.message.timestamp.type, but this is silently ignored. > This works with no error for the invalid timestamp value. But next time you > restart Kafka: > > [2018-05-29 13:09:05,806] FATAL [KafkaServer id=0] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > java.util.NoSuchElementException: Invalid timestamp type boom > at org.apache.kafka.common.record.TimestampType.forName(TimestampType.java:39) > at kafka.log.LogConfig.<init>(LogConfig.scala:94) > at kafka.log.LogConfig$.fromProps(LogConfig.scala:279) > at kafka.log.LogManager$$anonfun$17.apply(LogManager.scala:786) > at kafka.log.LogManager$$anonfun$17.apply(LogManager.scala:785) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at kafka.log.LogManager$.apply(LogManager.scala:785) > at kafka.server.KafkaServer.startup(KafkaServer.scala:222) > at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) > at kafka.Kafka$.main(Kafka.scala:92) > at kafka.Kafka.main(Kafka.scala) > [2018-05-29 13:09:05,811] INFO [KafkaServer id=0] shutting down > -- This message was sent by Atlassian JIRA (v7.6.3#76005)