Repository: kafka
Updated Branches:
  refs/heads/trunk 3a048e80d -> df449a24a


kafka-1473; transient unit test failure in 
testRequestHandlingDuringDeleteTopic;  patched by Guozhang Wang; reviewed by 
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df449a24
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df449a24
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df449a24

Branch: refs/heads/trunk
Commit: df449a24a7bd7b87502cab93c847a0dfea12c6db
Parents: 3a048e8
Author: Guozhang Wang <[email protected]>
Authored: Wed Jun 4 13:33:23 2014 -0700
Committer: Jun Rao <[email protected]>
Committed: Wed Jun 4 13:33:23 2014 -0700

----------------------------------------------------------------------
 .../unit/kafka/admin/DeleteTopicTest.scala      | 67 ++------------------
 1 file changed, 7 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/df449a24/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index a821d60..1b3c04e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -76,7 +76,7 @@ class DeleteTopicTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     val servers = createTestTopicAndCluster(topic)
     val controllerId = ZkUtils.getController(zkClient)
     val controller = servers.filter(s => s.config.brokerId == 
controllerId).head
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && 
s.config.brokerId != controllerId).last
     follower.shutdown()
 
@@ -97,62 +97,6 @@ class DeleteTopicTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   }
 
   @Test
-  def testRequestHandlingDuringDeleteTopic() {
-    val topicAndPartition = TopicAndPartition("test", 0)
-    val topic = topicAndPartition.topic
-    val servers = createTestTopicAndCluster(topic)
-    // start topic deletion
-    AdminUtils.deleteTopic(zkClient, topic)
-    // shut down one follower replica
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-    assertTrue("Leader should exist for partition [test,0]", 
leaderIdOpt.isDefined)
-    val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-    follower.shutdown()
-    // test if produce requests are failed with 
UnknownTopicOrPartitionException during delete topic
-    val props1 = new Properties()
-    props1.put("metadata.broker.list", servers.map(s => s.config.hostName + 
":" + s.config.port).mkString(","))
-    props1.put("serializer.class", "kafka.serializer.StringEncoder")
-    props1.put("request.required.acks", "1")
-    val producerConfig1 = new ProducerConfig(props1)
-    val producer1 = new Producer[String, String](producerConfig1)
-    try {
-      producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
-      fail("Test should fail because the topic is being deleted")
-    } catch {
-      case e: FailedToSendMessageException =>
-      case oe: Throwable => fail("fails with exception", oe)
-    } finally {
-      producer1.close()
-    }
-    // test if fetch requests fail during delete topic
-    val availableServers: Seq[KafkaServer] = servers.filter(s => 
s.config.brokerId != follower.config.brokerId).toSeq
-    availableServers.foreach {
-      server =>
-        val consumer = new SimpleConsumer(server.config.hostName, 
server.config.port, 1000000, 64 * 1024, "")
-        val request = new FetchRequestBuilder()
-          .clientId("test-client")
-          .addFetch(topic, 0, 0, 10000)
-          .build()
-        val fetched = consumer.fetch(request)
-        val fetchResponse = fetched.data(topicAndPartition)
-        assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", 
ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.error)
-    }
-    // test if offset requests fail during delete topic
-    availableServers.foreach {
-      server =>
-        val consumer = new SimpleConsumer(server.config.hostName, 
server.config.port, 1000000, 64 * 1024, "")
-        val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
-        val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
-        val errorCode = 
offsetResponse.partitionErrorAndOffsets(topicAndPartition).error
-        assertEquals("Offset request should fail with 
UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, 
errorCode)
-    }
-    // restart follower replica
-    follower.startup()
-    verifyTopicDeletion(topic, availableServers)
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
   def testPartitionReassignmentDuringDeleteTopic() {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topic = "test"
@@ -168,7 +112,7 @@ class DeleteTopicTest extends JUnit3Suite with 
ZooKeeperTestHarness {
     TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
       res && server.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created.")
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     assertTrue("Leader should exist for partition [test,0]", 
leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
     follower.shutdown()
@@ -202,7 +146,7 @@ class DeleteTopicTest extends JUnit3Suite with 
ZooKeeperTestHarness {
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"
     val servers = createTestTopicAndCluster(topic)
-    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     assertTrue("Leader should exist for partition [test,0]", 
leaderIdOpt.isDefined)
     val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
     val newPartition = TopicAndPartition(topic, 1)
@@ -334,8 +278,11 @@ class DeleteTopicTest extends JUnit3Suite with 
ZooKeeperTestHarness {
       "Admin path /admin/delete_topic/test path not deleted even after a 
replica is restarted")
     TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, 
ZkUtils.getTopicPath(topic)),
       "Topic path /brokers/topics/test not deleted after 
/admin/delete_topic/test path is deleted")
+    // ensure that the topic-partition has been deleted from all brokers' 
replica managers
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res 
&& server.replicaManager.getPartition(topic, 0) == None),
+      "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is 
marked successful in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.foldLeft(true)((res, server) => res && 
server.getLogManager().getLog(topicAndPartition).isEmpty))
   }
-}
\ No newline at end of file
+}

Reply via email to