This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2903606 KAFKA-7801: TopicCommand should not be able to alter transaction topic partition count 2903606 is described below commit 290360692114baa74f6ca1963b6700c38594d876 Author: huxihx <huxi...@hotmail.com> AuthorDate: Tue Mar 12 11:35:48 2019 +0530 KAFKA-7801: TopicCommand should not be able to alter transaction topic partition count To keep align with the way it handles the offset topic, TopicCommand should not be able to alter transaction topic partition count. Author: huxihx <huxi...@hotmail.com> Reviewers: Viktor Somogyi <viktorsomo...@gmail.com>, Ismael Juma <ism...@juma.me.uk>, Manikumar Reddy <manikumar.re...@gmail.com> Closes #6109 from huxihx/KAFKA-7801 --- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- .../scala/unit/kafka/admin/TopicCommandTest.scala | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index a4fa20f..0c26fc9 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -318,8 +318,8 @@ object TopicCommand extends Logging { } if(tp.hasPartitions) { - if (topic == Topic.GROUP_METADATA_TOPIC_NAME) { - throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") + if (Topic.INTERNAL_TOPICS.contains(topic)) { + throw new IllegalArgumentException(s"The number of partitions for the internal topics${Topic.INTERNAL_TOPICS} cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + "logic or ordering of the messages will be affected") diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 008fd66..76ed423 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -569,4 +569,26 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT assertFalse(TestUtils.grabConsoleOutput(topicService.deleteTopic(escapedCommandOpts)).contains(topic2)) assertTrue(TestUtils.grabConsoleOutput(topicService.deleteTopic(unescapedCommandOpts)).contains(topic2)) } + + @Test + def testAlterInternalTopicPartitionCount(): Unit = { + val brokers = List(0) + TestUtils.createBrokersInZk(zkClient, brokers) + + // create internal topics + adminZkClient.createTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, 1) + adminZkClient.createTopic(Topic.TRANSACTION_STATE_TOPIC_NAME, 1, 1) + + def expectAlterInternalTopicPartitionCountFailed(topic: String): Unit = { + try { + topicService.alterTopic(new TopicCommandOptions( + Array("--topic", topic, "--partitions", "2"))) + fail("Should have thrown an IllegalArgumentException") + } catch { + case _: IllegalArgumentException => // expected + } + } + expectAlterInternalTopicPartitionCountFailed(Topic.GROUP_METADATA_TOPIC_NAME) + expectAlterInternalTopicPartitionCountFailed(Topic.TRANSACTION_STATE_TOPIC_NAME) + } }