Repository: kafka Updated Branches: refs/heads/trunk 764d8ca9e -> 02d4da5f6
KAFKA-2960 KAFKA-1148; Clear purgatory for partitions before becoming follower Author: Jiangjie Qin <[email protected]> Reviewers: Aditya Auradkar <[email protected]>, Ismael Juma <[email protected]>, Joel Koshy <[email protected]>, Jun Rao <[email protected]>, Guozhang Wang <[email protected]> Closes #1018 from becketqin/KAFKA-2960 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02d4da5f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02d4da5f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02d4da5f Branch: refs/heads/trunk Commit: 02d4da5f64989b41358cdfd94d95b94fb4e20198 Parents: 764d8ca Author: Jiangjie Qin <[email protected]> Authored: Fri Mar 11 11:22:15 2016 -0800 Committer: Joel Koshy <[email protected]> Committed: Fri Mar 11 11:22:15 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/server/ReplicaManager.scala | 8 +- .../unit/kafka/server/ReplicaManagerTest.scala | 109 ++++++++++++++++--- 2 files changed, 100 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/02d4da5f/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5655313..de58e56 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -737,7 +737,8 @@ class ReplicaManager(val config: KafkaConfig, * 2. Mark the replicas as followers so that no more data can be added from the producer clients. * 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads. * 4. Truncate the log and checkpoint offsets for these partitions. - * 5. If the broker is not shutting down, add the fetcher to the new leaders. + * 5. Clear the produce and fetch requests in the purgatory + * 6. If the broker is not shutting down, add the fetcher to the new leaders. * * The ordering of doing these steps make sure that the replicas in transition will not * take any more messages before checkpointing offsets so that all messages before the checkpoint @@ -800,6 +801,11 @@ class ReplicaManager(val config: KafkaConfig, } logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) + partitionsToMakeFollower.foreach { partition => + val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId) + tryCompleteDelayedProduce(topicPartitionOperationKey) + tryCompleteDelayedFetch(topicPartitionOperationKey) + } partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + http://git-wip-us.apache.org/repos/asf/kafka/blob/02d4da5f/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 32085f6..a5a8df1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -18,25 +18,28 @@ package kafka.server -import kafka.api.SerializationTestUtils -import kafka.message.{Message, ByteBufferMessageSet} -import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils} -import org.apache.kafka.common.requests.ProduceRequest - -import java.util.concurrent.atomic.AtomicBoolean import java.io.File +import java.util.concurrent.atomic.AtomicBoolean +import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo} +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} +import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.LeaderAndIsrRequest +import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime => JMockTime} +import org.apache.kafka.common.{BrokerEndPoint, TopicPartition} import org.easymock.EasyMock -import org.I0Itec.zkclient.ZkClient +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test -import scala.collection.Map import scala.collection.JavaConverters._ +import scala.collection.Map class ReplicaManagerTest { @@ -47,9 +50,9 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val zkUtils = ZkUtils(zkClient, false) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -73,7 +76,7 @@ class ReplicaManagerTest { val zkClient = EasyMock.createMock(classOf[ZkClient]) val zkUtils = ZkUtils(zkClient, false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -84,7 +87,7 @@ class ReplicaManagerTest { rm.checkpointHighWatermarks() } finally { // shutdown the replica manager upon test completion - rm.shutdown(false) + rm.shutdown(checkpointHW = false) metrics.close() } } @@ -94,9 +97,9 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val zkUtils = ZkUtils(zkClient, false) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -112,10 +115,84 @@ class ReplicaManagerTest { messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), responseCallback = callback) } finally { - rm.shutdown(false) + rm.shutdown(checkpointHW = false) metrics.close() } TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } + + @Test + def testClearPurgatoryOnBecomingFollower() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val time = new MockTime() + val jTime = new JMockTime + val metrics = new Metrics + val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + new AtomicBoolean(false)) + + try { + var produceCallbackFired = false + def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = { + assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.errorCode) + produceCallbackFired = true + } + + var fetchCallbackFired = false + def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = { + assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.error) + fetchCallbackFired = true + } + + val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1)) + val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() + EasyMock.replay(metadataCache) + + val brokerList : java.util.List[Integer] = Seq[Integer](0, 1).asJava + val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1).asJava + + val partition = rm.getOrCreatePartition(topic, 0) + partition.getOrCreateReplica(0) + // Make this replica the leader. + val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, + Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava) + rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {}) + rm.getLeaderReplicaIfLocal(topic, 0) + + // Append a message. + rm.appendMessages( + timeout = 1000, + requiredAcks = -1, + internalTopicsAllowed = false, + messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), + responseCallback = produceCallback) + + // Fetch some messages + rm.fetchMessages( + timeout = 1000, + replicaId = -1, + fetchMinBytes = 100000, + fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(0, 100000)), + responseCallback = fetchCallback) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava, + Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava) + rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {}) + + assertTrue(produceCallbackFired) + assertTrue(fetchCallbackFired) + } finally { + rm.shutdown(checkpointHW = false) + metrics.close() + } + } }
