Repository: kafka Updated Branches: refs/heads/trunk d43666102 -> 143a33bc5
http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 69e83c0..b14464f 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -40,7 +40,7 @@ class ClientQuotaManagerTest { @Test def testQuotaParsing() { - val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time) + val clientMetrics = new ClientQuotaManager(config, newMetrics, QuotaType.Produce, time) // Case 1: Update the quota. Assert that the new quota value is returned clientMetrics.updateQuota("p1", new Quota(2000, true)) @@ -77,8 +77,8 @@ class ClientQuotaManagerTest { @Test def testQuotaViolation() { val metrics = newMetrics - val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) - val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "producer", "")) + val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time) + val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", "")) try { /* We have 10 second windows. Make sure that there is no quota violation * if we produce under the quota @@ -125,16 +125,16 @@ class ClientQuotaManagerTest { @Test def testExpireThrottleTimeSensor() { val metrics = newMetrics - val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time) try { clientMetrics.recordAndMaybeThrottle("client1", 100, callback) // remove the throttle time sensor - metrics.removeSensor("producerThrottleTime-client1") + metrics.removeSensor("ProduceThrottleTime-client1") // should not throw an exception even if the throttle time sensor does not exist. val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback) assertTrue("Should be throttled", throttleTime > 0) // the sensor should get recreated - val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1") + val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-client1") assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) } finally { clientMetrics.shutdown() @@ -144,21 +144,21 @@ class ClientQuotaManagerTest { @Test def testExpireQuotaSensors() { val metrics = newMetrics - val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time) try { clientMetrics.recordAndMaybeThrottle("client1", 100, callback) // remove all the sensors - metrics.removeSensor("producerThrottleTime-client1") - metrics.removeSensor("producer-client1") + metrics.removeSensor("ProduceThrottleTime-client1") + metrics.removeSensor("Produce-client1") // should not throw an exception val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback) assertTrue("Should be throttled", throttleTime > 0) // all the sensors should get recreated - val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1") + val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-client1") assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) - val byteRateSensor = metrics.getSensor("producer-client1") + val byteRateSensor = metrics.getSensor("Produce-client1") assertTrue("Byte rate sensor should exist", byteRateSensor != null) } finally { clientMetrics.shutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index a9df929..bf11332 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -1,32 +1,32 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package kafka.server import java.util.Properties -import org.apache.kafka.common.protocol.ApiKeys +import kafka.log.LogConfig._ +import kafka.server.Constants._ import org.junit.Assert._ import org.apache.kafka.common.metrics.Quota -import org.easymock.{Capture, EasyMock} +import org.easymock.EasyMock import org.junit.Test import kafka.integration.KafkaServerTestHarness import kafka.utils._ import kafka.common._ -import kafka.log.LogConfig import kafka.admin.{AdminOperationException, AdminUtils} import scala.collection.Map @@ -37,19 +37,19 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testConfigChange() { assertTrue("Should contain a ConfigHandler for topics", - this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic)) + this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic)) val oldVal: java.lang.Long = 100000L val newVal: java.lang.Long = 200000L val tp = TopicAndPartition("test", 0) val logProps = new Properties() - logProps.put(LogConfig.FlushMessagesProp, oldVal.toString) + logProps.put(FlushMessagesProp, oldVal.toString) AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps) TestUtils.retry(10000) { val logOpt = this.servers.head.logManager.getLog(tp) assertTrue(logOpt.isDefined) assertEquals(oldVal, logOpt.get.config.flushInterval) } - logProps.put(LogConfig.FlushMessagesProp, newVal.toString) + logProps.put(FlushMessagesProp, newVal.toString) AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps) TestUtils.retry(10000) { assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval) @@ -59,21 +59,21 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testClientQuotaConfigChange() { assertTrue("Should contain a ConfigHandler for topics", - this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client)) + this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client)) val clientId = "testClient" val props = new Properties() props.put(ClientConfigOverride.ProducerOverride, "1000") props.put(ClientConfigOverride.ConsumerOverride, "2000") AdminUtils.changeClientIdConfig(zkUtils, clientId, props) - val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers + val quotaManagers = servers.head.apis.quotas TestUtils.retry(10000) { - val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId) - val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId) + val overrideProducerQuota = quotaManagers.produce.quota(clientId) + val overrideConsumerQuota = quotaManagers.fetch.quota(clientId) assertEquals(s"ClientId $clientId must have overridden producer quota of 1000", Quota.upperBound(1000), overrideProducerQuota) - assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000", + assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000", Quota.upperBound(2000), overrideConsumerQuota) } @@ -84,8 +84,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties()) TestUtils.retry(10000) { - val producerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId) - val consumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId) + val producerQuota = quotaManagers.produce.quota(clientId) + val consumerQuota = quotaManagers.fetch.quota(clientId) assertEquals(s"ClientId $clientId must have reset producer quota to " + defaultProducerQuota, Quota.upperBound(defaultProducerQuota), producerQuota) @@ -99,7 +99,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { val topic = TestUtils.tempTopic try { val logProps = new Properties() - logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer) + logProps.put(FlushMessagesProp, 10000: java.lang.Integer) AdminUtils.changeTopicConfig(zkUtils, topic, logProps) fail("Should fail with AdminOperationException for topic doesn't exist") } catch { @@ -162,4 +162,47 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Verify that processConfigChanges was only called once EasyMock.verify(handler) } -} + + @Test + def shouldParseReplicationQuotaProperties { + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val props: Properties = new Properties() + + //Given + props.put(ThrottledReplicasListProp, "0:101,0:102,1:101,1:102") + + //When/Then + assertEquals(Seq(0,1), configHandler.parseThrottledPartitions(props, 102)) + assertEquals(Seq(), configHandler.parseThrottledPartitions(props, 103)) + } + + @Test + def shouldParseWildcardReplicationQuotaProperties { + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val props: Properties = new Properties() + + //Given + props.put(ThrottledReplicasListProp, "*") + + //When + val result = configHandler.parseThrottledPartitions(props, 102) + + //Then + assertEquals(AllReplicas, result) + } + + @Test + def shouldParseReplicationQuotaReset { + val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, null) + val props: Properties = new Properties() + + //Given + props.put(ThrottledReplicasListProp, "") + + //When + val result = configHandler.parseThrottledPartitions(props, 102) + + //Then + assertEquals(Seq(), result) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index f5b515b..e7e1554 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -18,14 +18,15 @@ package kafka.server import kafka.log._ import java.io.File +import kafka.utils.SystemTime import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.utils.{Utils, MockTime => JMockTime} +import org.apache.kafka.common.utils.{MockTime => JMockTime, Utils} import org.easymock.EasyMock import org.junit._ import org.junit.Assert._ import kafka.common._ import kafka.cluster.Replica -import kafka.utils.{KafkaScheduler, MockTime, SystemTime, TestUtils, ZkUtils} +import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} import java.util.concurrent.atomic.AtomicBoolean class HighwatermarkPersistenceTest { @@ -56,7 +57,7 @@ class HighwatermarkPersistenceTest { val metrics = new Metrics // create replica manager val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkUtils, scheduler, - logManagers.head, new AtomicBoolean(false)) + logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() @@ -99,7 +100,7 @@ class HighwatermarkPersistenceTest { val metrics = new Metrics // create replica manager val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkUtils, - scheduler, logManagers.head, new AtomicBoolean(false)) + scheduler, logManagers.head, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower) replicaManager.startup() try { replicaManager.checkpointHighWatermarks() http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index c34e4f0..540a665 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -52,7 +52,7 @@ class IsrExpirationTest { @Before def setUp() { - replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false)) + replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, null, null, null, new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, SystemTime).follower) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala new file mode 100644 index 0000000..69365aa --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean + +import kafka.api._ +import kafka.cluster.Replica +import kafka.common.TopicAndPartition +import kafka.log.Log +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.utils._ +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.{MockTime => JMockTime} +import org.easymock.EasyMock +import org.easymock.EasyMock._ +import org.junit.Assert._ +import org.junit.{After, Test} + + +class ReplicaManagerQuotasTest { + val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_, new Properties())) + val time = new MockTime + val jTime = new JMockTime + val metrics = new Metrics + val message = new Message("some-data-in-a-message".getBytes()) + val topicAndPartition1 = TopicAndPartition("test-topic", 1) + val topicAndPartition2 = TopicAndPartition("test-topic", 2) + val fetchInfo = Map(topicAndPartition1 -> PartitionFetchInfo(0, 100), topicAndPartition2 -> PartitionFetchInfo(0, 100)) + var replicaManager: ReplicaManager = null + + @Test + def shouldExcludeSubsequentThrottledPartitions(): Unit = { + setUpMocks(fetchInfo) + + val quota = mockQuota(1000000) + expect(quota.isQuotaExceeded()).andReturn(false).once() + expect(quota.isQuotaExceeded()).andReturn(true).once() + replay(quota) + + val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota) + assertEquals("Given two partitions, with only one throttled, we should get the first", 1, + fetch.get(topicAndPartition1).get.info.messageSet.size) + + assertEquals("But we shouldn't get the second", 0, + fetch.get(topicAndPartition2).get.info.messageSet.size) + } + + @Test + def shouldGetNoMessagesIfQuotasExceededOnSubsequentPartitions(): Unit = { + setUpMocks(fetchInfo) + + val quota = mockQuota(1000000) + expect(quota.isQuotaExceeded()).andReturn(true).once() + expect(quota.isQuotaExceeded()).andReturn(true).once() + replay(quota) + + val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota) + assertEquals("Given two partitions, with both throttled, we should get no messages", 0, + fetch.get(topicAndPartition1).get.info.messageSet.size) + assertEquals("Given two partitions, with both throttled, we should get no messages", 0, + fetch.get(topicAndPartition2).get.info.messageSet.size) + } + + @Test + def shouldGetBothMessagesIfQuotasAllow(): Unit = { + setUpMocks(fetchInfo) + + val quota = mockQuota(1000000) + expect(quota.isQuotaExceeded()).andReturn(false).once() + expect(quota.isQuotaExceeded()).andReturn(false).once() + replay(quota) + + val fetch = replicaManager.readFromLocalLog(true, true, fetchInfo, quota) + assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, + fetch.get(topicAndPartition1).get.info.messageSet.size) + assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, + fetch.get(topicAndPartition2).get.info.messageSet.size) + } + + def setUpMocks(fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], message: Message = this.message) { + val zkUtils = createNiceMock(classOf[ZkUtils]) + val scheduler = createNiceMock(classOf[KafkaScheduler]) + + //Create log which handles both a regular read and a 0 bytes read + val log = createMock(classOf[Log]) + expect(log.logEndOffset).andReturn(20L).anyTimes() + expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes() + + //if we ask for len 1 return a message + expect(log.read(anyObject(), geq(1), anyObject())).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, 0), + new ByteBufferMessageSet(message) + )).anyTimes() + + //if we ask for len = 0, return 0 messages + expect(log.read(anyObject(), EasyMock.eq(0), anyObject())).andReturn( + new FetchDataInfo( + new LogOffsetMetadata(0L, 0L, 0), + new ByteBufferMessageSet() + )).anyTimes() + replay(log) + + //Create log manager + val logManager = createMock(classOf[kafka.log.LogManager]) + + //Return the same log for each partition as it doesn't matter + expect(logManager.getLog(anyObject())).andReturn(Some(log)).anyTimes() + replay(logManager) + + replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager, + new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower) + + //create the two replicas + for (p <- fetchInfo.keySet) { + val partition = replicaManager.getOrCreatePartition(p.topic, p.partition) + val replica = new Replica(configs.head.brokerId, partition, time, 0, Some(log)) + replica.highWatermark = new LogOffsetMetadata(5) + partition.leaderReplicaIdOpt = Some(replica.brokerId) + val allReplicas = List(replica) + allReplicas.foreach(partition.addReplicaIfNotExists(_)) + partition.inSyncReplicas = allReplicas.toSet + } + } + + @After + def tearDown() { + replicaManager.shutdown(false) + metrics.close() + } + + def mockQuota(bound: Long): ReplicaQuota = { + val quota = createMock(classOf[ReplicaQuota]) + expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes() + quota + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index bfb66b9..47e5461 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -66,7 +66,7 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower) try { val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) @@ -84,7 +84,7 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower) try { val partition = rm.getOrCreatePartition(topic, 1) partition.getOrCreateReplica(1) @@ -101,7 +101,7 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), Option(this.getClass.getName)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName)) try { def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { assert(responseStatus.values.head.errorCode == Errors.INVALID_REQUIRED_ACKS.code) @@ -126,7 +126,7 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower) try { var produceCallbackFired = false @@ -195,7 +195,7 @@ class ReplicaManagerTest { val config = KafkaConfig.fromProps(props) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, - new AtomicBoolean(false), Option(this.getClass.getName)) + new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName)) try { val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala new file mode 100644 index 0000000..5c41372 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import java.util.Collections + +import kafka.common.TopicAndPartition +import kafka.server.QuotaType._ +import kafka.server.{QuotaType, ReplicationQuotaManager, ReplicationQuotaManagerConfig} +import org.apache.kafka.common.metrics.{Quota, MetricConfig, Metrics} +import org.apache.kafka.common.utils.MockTime +import org.junit.Assert.{assertFalse, assertTrue, assertEquals} +import org.junit.Test +import scala.collection.JavaConverters._ + +class ReplicationQuotaManagerTest { + private val time = new MockTime + + @Test + def shouldThrottleOnlyDefinedReplicas() { + val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, QuotaType.Fetch, time) + quota.markThrottled("topic1", Seq(1, 2, 3)) + + assertTrue(quota.isThrottled(tp1(1))) + assertTrue(quota.isThrottled(tp1(2))) + assertTrue(quota.isThrottled(tp1(3))) + assertFalse(quota.isThrottled(tp1(4))) + } + + @Test + def shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses(): Unit = { + val metrics = newMetrics() + val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(numQuotaSamples = 10, quotaWindowSizeSeconds = 1), metrics, LeaderReplication, time) + + //Given + quota.updateQuota(new Quota(100, true)) + + //Quota should not be broken when we start + assertFalse(quota.isQuotaExceeded()) + + //First window is fixed, so we'll skip it + time.sleep(1000) + + //When we record up to the quota value after half a window + time.sleep(500) + quota.record(1) + + //Then it should not break the quota + assertFalse(quota.isQuotaExceeded()) + + //When we record half the quota (half way through the window), we still should not break + quota.record(149) //150B, 1.5s + assertFalse(quota.isQuotaExceeded()) + + //Add a byte to push over quota + quota.record(1) //151B, 1.5s + + //Then it should break the quota + assertEquals(151 / 1.5, rate(metrics), 0) //151B, 1.5s + assertTrue(quota.isQuotaExceeded()) + + //When we sleep for the remaining half the window + time.sleep(500) //151B, 2s + + //Then Our rate should have halved (i.e back down below the quota) + assertFalse(quota.isQuotaExceeded()) + assertEquals(151d / 2, rate(metrics), 0.1) //151B, 2s + + //When we sleep for another half a window (now half way through second window) + time.sleep(500) + quota.record(99) //250B, 2.5s + + //Then the rate should be exceeded again + assertEquals(250 / 2.5, rate(metrics), 0) //250B, 2.5s + assertFalse(quota.isQuotaExceeded()) + quota.record(1) + assertTrue(quota.isQuotaExceeded()) + assertEquals(251 / 2.5, rate(metrics), 0) + + //Sleep for 2 more window + time.sleep(2 * 1000) //so now at 3.5s + assertFalse(quota.isQuotaExceeded()) + assertEquals(251d / 4.5, rate(metrics), 0) + } + + def rate(metrics: Metrics): Double = { + val metricName = metrics.metricName("byte-rate", LeaderReplication.toString, "Tracking byte-rate for " + LeaderReplication) + val leaderThrottledRate = metrics.metrics.asScala(metricName).value() + leaderThrottledRate + } + + @Test + def shouldSupportWildcardThrottledReplicas(): Unit = { + val quota = new ReplicationQuotaManager(ReplicationQuotaManagerConfig(), newMetrics, LeaderReplication, time) + + //When + quota.markThrottled("MyTopic") + + //Then + assertTrue(quota.isThrottled(TopicAndPartition("MyTopic", 0))) + assertFalse(quota.isThrottled(TopicAndPartition("MyOtherTopic", 0))) + } + + private def tp1(id: Int): TopicAndPartition = new TopicAndPartition("topic1", id) + + private def newMetrics(): Metrics = { + new Metrics(new MetricConfig(), Collections.emptyList(), time) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala new file mode 100644 index 0000000..af7c4c8 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import java.util.Properties + +import kafka.admin.AdminUtils +import kafka.admin.AdminUtils._ +import kafka.common._ +import kafka.log.LogConfig._ +import kafka.server.KafkaConfig.fromProps +import kafka.server.QuotaType._ +import kafka.server._ +import kafka.utils.TestUtils +import kafka.utils.TestUtils._ +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import scala.collection.JavaConverters._ + +/** + * This is the main test which ensure Replication Quotas work correctly. + * + * The test will fail if the quota is < 1MB/s as 1MB is the default for replica.fetch.max.bytes. + * So with a throttle of 100KB/s, 1 fetch of 1 partition would fill 10s of quota. In turn causing + * the throttled broker to pause for > 10s + * + * Anything over 100MB/s tends to fail as this is the non-throttled replication rate + */ + +class ReplicationQuotasTest extends ZooKeeperTestHarness { + def percentError(percent: Int, value: Long): Long = Math.round(value * percent / 100) + + val msg100KB = new Array[Byte](100000) + var brokers: Seq[KafkaServer] = null + val topic = "topic1" + var producer: KafkaProducer[Array[Byte], Array[Byte]] = null + + @Before + override def setUp() { + super.setUp() + } + + @After + override def tearDown() { + brokers.par.foreach(_.shutdown()) + producer.close() + super.tearDown() + } + + @Test + def shouldBootstrapTwoBrokersWithLeaderThrottle(): Unit = { + shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(true) + } + + @Test + def shouldBootstrapTwoBrokersWithFollowerThrottle(): Unit = { + shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(false) + } + + def shouldMatchQuotaReplicatingThroughAnAsymmetricTopology(leaderThrottle: Boolean): Unit = { + /** + * In short we have 8 brokers, 2 are not-started. We assign replicas for the two non-started + * brokers, so when we start them we can monitor replication from the 6 to the 2. + * + * We also have two non-throttled partitions on two of the 6 brokers, just to make sure + * regular replication works as expected. + */ + + brokers = (100 to 105).map { id => TestUtils.createServer(fromProps(createBrokerConfig(id, zkConnect))) } + + //Given six partitions, lead on nodes 0,1,2,3,4,5 but will followers on node 6,7 (not started yet) + //And two extra partitions 6,7, which we don't intend on throttling + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map( + 0 -> Seq(100, 106), //Throttled + 1 -> Seq(101, 106), //Throttled + 2 -> Seq(102, 106), //Throttled + 3 -> Seq(103, 107), //Throttled + 4 -> Seq(104, 107), //Throttled + 5 -> Seq(105, 107), //Throttled + 6 -> Seq(100, 106), //Not Throttled + 7 -> Seq(101, 107) //Not Throttled + )) + + val msg = msg100KB + val msgCount: Int = 1000 + val expectedDuration = 10 //Keep the test to N seconds + var throttle: Long = msgCount * msg.length / expectedDuration + if (!leaderThrottle) throttle = throttle * 3 //Follower throttle needs to replicate 3x as fast to get the same duration as there are three replicas to replicate for each of the two follower brokers + + //Set the throttle limit on all 8 brokers, but only assign throttled replicas to the six leaders, or two followers + (100 to 107).foreach { brokerId => + changeBrokerConfig(zkUtils, Seq(brokerId), property(KafkaConfig.ThrottledReplicationRateLimitProp, throttle.toString)) + } + if (leaderThrottle) + changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:100,1:101,2:102,3:103,4:104,5:105")) //partition-broker:... throttle the 6 leaders + else + changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:106,1:106,2:106,3:107,4:107,5:107")) //partition-broker:... throttle the two followers + + //Add data equally to each partition + producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), retries = 5, acks = 0) + (0 until msgCount).foreach { x => + (0 to 7).foreach { partition => + producer.send(new ProducerRecord(topic, partition, null, msg)).get + } + } + + //Ensure data is fully written: broker 1 has partition 1, broker 2 has partition 2 etc + (0 to 5).foreach { id => waitForOffsetsToMatch(msgCount, id, 100 + id) } + //Check the non-throttled partitions too + waitForOffsetsToMatch(msgCount, 6, 100) + waitForOffsetsToMatch(msgCount, 7, 101) + + val start = System.currentTimeMillis() + + //When we create the 2 new, empty brokers + brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(106, zkConnect))) + brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(107, zkConnect))) + + //Check that throttled config correctly migrated to the new brokers + (106 to 107).foreach { brokerId => + assertEquals(throttle, brokerFor(brokerId).quotaManagers.follower.upperBound()) + } + if (!leaderThrottle) { + (0 to 2).foreach { partition => + assertTrue(brokerFor(106).quotaManagers.follower.isThrottled(new TopicAndPartition(topic, partition))) + } + (3 to 5).foreach { partition => + assertTrue(brokerFor(107).quotaManagers.follower.isThrottled(new TopicAndPartition(topic, partition))) + } + } + + //Wait for non-throttled partitions to replicate first + (6 to 7).foreach { id => waitForOffsetsToMatch(msgCount, id, 100 + id) } + val unthrottledTook = System.currentTimeMillis() - start + + //Wait for replicas 0,1,2,3,4,5 to fully replicated to broker 106,107 + (0 to 2).foreach { id => waitForOffsetsToMatch(msgCount, id, 106) } + (3 to 5).foreach { id => waitForOffsetsToMatch(msgCount, id, 107) } + + val throttledTook = System.currentTimeMillis() - start + + //Check the recorded throttled rate is what we expect + if (leaderThrottle) { + (100 to 105).map(brokerFor(_)).foreach { broker => + val metricName = broker.metrics.metricName("byte-rate", LeaderReplication.toString, "Tracking byte-rate for" + LeaderReplication) + val measuredRate = broker.metrics.metrics.asScala(metricName).value() + info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded Rate was:$measuredRate") + assertEquals(throttle, measuredRate, percentError(25, throttle)) + } + } else { + (106 to 107).map(brokerFor(_)).foreach { broker => + val metricName = broker.metrics.metricName("byte-rate", FollowerReplication.toString, "Tracking byte-rate for" + FollowerReplication) + val measuredRate = broker.metrics.metrics.asScala(metricName).value() + info(s"Broker:${broker.config.brokerId} Expected:$throttle, Recorded Rate was:$measuredRate") + assertEquals(throttle, measuredRate, percentError(25, throttle)) + } + } + + //Check the times for throttled/unthrottled are each side of what we expect + info(s"Unthrottled took: $unthrottledTook, Throttled took: $throttledTook, for expeted $expectedDuration secs") + assertTrue(s"Unthrottled replication of ${unthrottledTook}ms should be < ${expectedDuration * 1000}ms", + unthrottledTook < expectedDuration * 1000) + assertTrue((s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration * 1000}ms"), + throttledTook > expectedDuration * 1000) + assertTrue((s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration * 1500}ms"), + throttledTook < expectedDuration * 1000 * 1.5) + } + + @Test + def shouldThrottleOldSegments(): Unit = { + /** + * Simple test which ensures throttled replication works when the dataset spans many segments + */ + + //2 brokers with 1MB Segment Size & 1 partition + val config: Properties = createBrokerConfig(100, zkConnect) + config.put("log.segment.bytes", (1024 * 1024).toString) + brokers = Seq(TestUtils.createServer(fromProps(config))) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101))) + + //Write 20MBs and throttle at 5MB/s + val msg = msg100KB + val msgCount: Int = 200 + val expectedDuration = 4 + val throttle: Long = msg.length * msgCount / expectedDuration + + //Set the throttle limit leader + changeBrokerConfig(zkUtils, Seq(100), property(KafkaConfig.ThrottledReplicationRateLimitProp, throttle.toString)) + changeTopicConfig(zkUtils, topic, property(ThrottledReplicasListProp, "0:100")) + + //Add data + addData(msgCount, msg) + + val start = System.currentTimeMillis() + + //Start the new broker (and hence start replicating) + brokers = brokers :+ TestUtils.createServer(fromProps(createBrokerConfig(101, zkConnect))) + waitForOffsetsToMatch(msgCount, 0, 101) + + val throttledTook = System.currentTimeMillis() - start + + assertTrue((s"Throttled replication of ${throttledTook}ms should be > ${expectedDuration * 1000 * 0.9}ms"), + throttledTook > expectedDuration * 1000 * 0.9) + assertTrue((s"Throttled replication of ${throttledTook}ms should be < ${expectedDuration * 1500}ms"), + throttledTook < expectedDuration * 1000 * 1.5) + } + + def addData(msgCount: Int, msg: Array[Byte]): Boolean = { + producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), retries = 5, acks = 0) + (0 until msgCount).foreach { x => producer.send(new ProducerRecord(topic, msg)).get } + waitForOffsetsToMatch(msgCount, 0, 100) + } + + private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Boolean = { + waitUntilTrue(() => { + offset == brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, partitionId)).map(_.logEndOffset).getOrElse(0) + }, s"Offsets did not match for partition $partitionId on broker $brokerId", 60000) + } + + private def property(key: String, value: String) = { + new Properties() { put(key, value) } + } + + private def brokerFor(id: Int): KafkaServer = brokers.filter(_.config.brokerId == id)(0) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 7741698..1052be5 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -16,23 +16,23 @@ */ package kafka.server +import java.util.concurrent.atomic.AtomicBoolean +import java.util.{Collections, Properties} + import kafka.api._ -import kafka.utils._ import kafka.cluster.Replica import kafka.common.TopicAndPartition import kafka.log.Log -import kafka.message.{MessageSet, ByteBufferMessageSet, Message} +import kafka.message.{ByteBufferMessageSet, Message, MessageSet} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils._ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.{MockTime => JMockTime} -import org.junit.{Test, After, Before} - -import java.util.{Properties, Collections} -import java.util.concurrent.atomic.AtomicBoolean -import collection.JavaConversions._ - import org.easymock.EasyMock -import org.I0Itec.zkclient.ZkClient import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.collection.JavaConversions._ class SimpleFetchTest { @@ -99,7 +99,7 @@ class SimpleFetchTest { // create the replica manager replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager, - new AtomicBoolean(false)) + new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower) // add the partition with two replicas, both in ISR val partition = replicaManager.getOrCreatePartition(topic, partitionId) @@ -148,9 +148,9 @@ class SimpleFetchTest { val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count() assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, - replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) + replicaManager.readFromLocalLog(true, true, fetchInfo, UnboundedQuota).get(topicAndPartition).get.info.messageSet.head.message) assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, - replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) + replicaManager.readFromLocalLog(true, false, fetchInfo, UnboundedQuota).get(topicAndPartition).get.info.messageSet.head.message) assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()) assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()) http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala index 778f3f8..aae66d8 100644 --- a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala @@ -43,7 +43,7 @@ class ThrottledResponseExpirationTest { @Test def testExpire() { - val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, "producer", time) + val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, QuotaType.Produce, time) val delayQueue = new DelayQueue[ThrottledResponse]() val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue) http://git-wip-us.apache.org/repos/asf/kafka/blob/143a33bc/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 131a24a..dadc8a3 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -933,18 +933,24 @@ object TestUtils extends Logging { def produceMessages(servers: Seq[KafkaServer], topic: String, - numMessages: Int): Seq[String] = { + numMessages: Int, + acks: Int = 0, + valueBytes: Int = -1): Seq[Array[Byte]] = { val producer = createNewProducer( TestUtils.getBrokerListStrFromServers(servers), retries = 5, - requestTimeoutMs = 2000 + requestTimeoutMs = 2000, + acks = acks ) - val values = (0 until numMessages).map(x => s"test-$x") + val values = (0 until numMessages).map(x => valueBytes match { + case -1 => s"test-$x".getBytes + case _ => new Array[Byte](valueBytes) + }) val futures = values.map { value => - producer.send(new ProducerRecord(topic, null, null, value.getBytes)) + producer.send(new ProducerRecord(topic, value)) } futures.foreach(_.get) producer.close()
