This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new bda2a02 KAFKA-4893; Fix deletion and moving of topics with long names bda2a02 is described below commit bda2a02ce7db26a126c99cc477ef64c9e7133b70 Author: Colin P. Mccabe <cmcc...@confluent.io> AuthorDate: Tue Jun 4 14:14:32 2019 -0700 KAFKA-4893; Fix deletion and moving of topics with long names Author: Colin P. Mccabe <cmcc...@confluent.io> Reviewers: Gwen Shapira, David Arthur, James Cheng, Vahid Hashemian Closes #6869 from cmccabe/KAFKA-4893 (cherry picked from commit e6563aab722b35c4984b77e9eee42a1904cd1ea6) Signed-off-by: Gwen Shapira <csh...@gmail.com> --- core/src/main/scala/kafka/log/Log.scala | 15 ++++++++++----- core/src/main/scala/kafka/server/ReplicaManager.scala | 8 +++++++- .../kafka/api/AdminClientIntegrationTest.scala | 18 ++++++++++++++++++ .../test/scala/unit/kafka/log/LogManagerTest.scala | 9 ++++++++- core/src/test/scala/unit/kafka/log/LogTest.scala | 19 ++++++++++++++++++- 5 files changed, 61 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 149a4f0..01b12c8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2173,8 +2173,8 @@ object Log { /** a directory that is used for future partition */ val FutureDirSuffix = "-future" - private val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") - private val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix") + private[log] val DeleteDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$DeleteDirSuffix") + private[log] val FutureDirPattern = Pattern.compile(s"^(\\S+)-(\\S+)\\.(\\S+)$FutureDirSuffix") val UnknownLogStartOffset = -1L @@ -2220,11 +2220,16 @@ object Log { new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix + suffix) /** - * Return a directory name to rename the log directory to for async deletion. The name will be in the following - * format: topic-partition.uniqueId-delete where topic, partition and uniqueId are variables. + * Return a directory name to rename the log directory to for async deletion. + * The name will be in the following format: "topic-partitionId.uniqueId-delete". + * If the topic name is too long, it will be truncated to prevent the total name + * from exceeding 255 characters. */ def logDeleteDirName(topicPartition: TopicPartition): String = { - logDirNameWithSuffix(topicPartition, DeleteDirSuffix) + val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "") + val suffix = s"-${topicPartition.partition()}.${uniqueId}${DeleteDirSuffix}" + val prefixLength = Math.min(topicPartition.topic().size, 255 - suffix.size) + s"${topicPartition.topic().substring(0, prefixLength)}${suffix}" } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0aaa0bc..352ef64 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -574,6 +574,11 @@ class ReplicaManager(val config: KafkaConfig, replicaStateChangeLock synchronized { partitionDirs.map { case (topicPartition, destinationDir) => try { + /* If the topic name is exceptionally long, we can't support altering the log directory. + * See KAFKA-4893 for details. + * TODO: fix this by implementing topic IDs. */ + if (Log.logFutureDirName(topicPartition).size > 255) + throw new InvalidTopicException("The topic name is too long.") if (!logManager.isLogDirOnline(destinationDir)) throw new KafkaStorageException(s"Log directory $destinationDir is offline") @@ -614,7 +619,8 @@ class ReplicaManager(val config: KafkaConfig, (topicPartition, Errors.NONE) } catch { - case e@(_: LogDirNotFoundException | + case e@(_: InvalidTopicException | + _: LogDirNotFoundException | _: ReplicaNotAvailableException | _: KafkaStorageException) => (topicPartition, Errors.forException(e)) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 08b2b01..8f8a679 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -1408,6 +1408,24 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { assertEquals(2, currentLeader(partition1)) assertEquals(2, currentLeader(partition2)) } + + @Test + def testLongTopicNames(): Unit = { + val client = AdminClient.create(createConfig) + val longTopicName = String.join("", Collections.nCopies(249, "x")); + val invalidTopicName = String.join("", Collections.nCopies(250, "x")); + val newTopics2 = Seq(new NewTopic(invalidTopicName, 3, 3), + new NewTopic(longTopicName, 3, 3)) + val results = client.createTopics(newTopics2.asJava).values() + assertTrue(results.containsKey(longTopicName)) + results.get(longTopicName).get() + assertTrue(results.containsKey(invalidTopicName)) + assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException]) + assertFutureExceptionTypeEquals(client.alterReplicaLogDirs( + Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(), + classOf[InvalidTopicException]) + client.close() + } } object AdminClientIntegrationTest { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 812dada..1c37ddb 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -18,7 +18,7 @@ package kafka.log import java.io._ -import java.util.Properties +import java.util.{Collections, Properties} import kafka.server.FetchDataInfo import kafka.server.checkpoints.OffsetCheckpointFile @@ -376,6 +376,13 @@ class LogManagerTest { } @Test + def testCreateAndDeleteOverlyLongTopic(): Unit = { + val invalidTopicName = String.join("", Collections.nCopies(253, "x")); + val log = logManager.getOrCreateLog(new TopicPartition(invalidTopicName, 0), logConfig) + logManager.asyncDelete(new TopicPartition(invalidTopicName, 0)) + } + + @Test def testCheckpointForOnlyAffectedLogs() { val tps = Seq( new TopicPartition("test-a", 0), diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2733019..f7388ba 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -20,7 +20,8 @@ package kafka.log import java.io._ import java.nio.ByteBuffer import java.nio.file.{Files, Paths} -import java.util.{Optional, Properties} +import java.util.regex.Pattern +import java.util.{Collections, Optional, Properties} import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0} import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} @@ -75,6 +76,22 @@ class LogTest { } @Test + def testLogDeleteDirName(): Unit = { + val name1 = Log.logDeleteDirName(new TopicPartition("foo", 3)) + assertTrue(name1.length <= 255) + assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches()) + assertTrue(Log.DeleteDirPattern.matcher(name1).matches()) + assertFalse(Log.FutureDirPattern.matcher(name1).matches()) + val name2 = Log.logDeleteDirName( + new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 5)) + System.out.println("name2 = " + name2) + assertEquals(255, name2.length) + assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches()) + assertTrue(Log.DeleteDirPattern.matcher(name2).matches()) + assertFalse(Log.FutureDirPattern.matcher(name2).matches()) + } + + @Test def testOffsetFromFile() { val offset = 23423423L