Repository: kafka Updated Branches: refs/heads/trunk ae2414127 -> ffb81a581
kafka-1616; Purgatory Size and Num.Delayed.Request metrics are incorrect; patched by Guozhang Wang; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffb81a58 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffb81a58 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffb81a58 Branch: refs/heads/trunk Commit: ffb81a581b6cbd3bd7338ec88022aa745a05b1c9 Parents: ae24141 Author: Guozhang Wang <[email protected]> Authored: Thu Sep 4 22:23:25 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Sep 4 22:23:25 2014 -0700 ---------------------------------------------------------------------- .../scala/kafka/server/RequestPurgatory.scala | 64 +++++++++++--------- .../kafka/server/RequestPurgatoryTest.scala | 33 +++++++++- 2 files changed, 67 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ffb81a58/core/src/main/scala/kafka/server/RequestPurgatory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala index ce06d2c..cf3ed4c 100644 --- a/core/src/main/scala/kafka/server/RequestPurgatory.scala +++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala @@ -71,9 +71,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt /* a list of requests watching each key */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) - /* the number of requests being watched, duplicates added on different watchers are also counted */ - private val watched = new AtomicInteger(0) - /* background thread expiring requests that have been waiting too long */ private val expiredRequestReaper = new ExpiredRequestReaper private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false) @@ -81,14 +78,14 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt newGauge( "PurgatorySize", new Gauge[Int] { - def value = watched.get() + expiredRequestReaper.numRequests + def value = watched() } ) newGauge( "NumDelayedRequests", new Gauge[Int] { - def value = expiredRequestReaper.unsatisfied.get() + def value = delayed() } ) @@ -130,6 +127,21 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt w.collectSatisfiedRequests() } + /* + * Return the size of the watched lists in the purgatory, which is the size of watch lists. + * Since an operation may still be in the watch lists even when it has been completed, + * this number may be larger than the number of real operations watched + */ + def watched() = watchersForKey.values.map(_.watched).sum + + /* + * Return the number of requests in the expiry reaper's queue + */ + def delayed() = expiredRequestReaper.delayed() + + /* + * Return the watch list for the given watch key + */ private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key) /** @@ -156,6 +168,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt private class Watchers { private val requests = new util.ArrayList[T] + // return the size of the watch list + def watched() = requests.size() + // potentially add the element to watch if it is not satisfied yet def checkAndMaybeAdd(t: T): Boolean = { synchronized { @@ -168,7 +183,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt return false } requests.add(t) - watched.getAndIncrement() return true } } @@ -182,7 +196,6 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val curr = iter.next if(curr.satisfied.get()) { iter.remove() - watched.getAndDecrement() purged += 1 } } @@ -206,11 +219,9 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt val satisfied = curr synchronized checkSatisfied(curr) if(satisfied) { iter.remove() - watched.getAndDecrement() val updated = curr.satisfied.compareAndSet(false, true) if(updated == true) { response += curr - expiredRequestReaper.satisfyRequest() } } } @@ -225,16 +236,13 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt */ private class ExpiredRequestReaper extends Runnable with Logging { this.logIdent = "ExpiredRequestReaper-%d ".format(brokerId) - - private val delayed = new DelayQueue[T] private val running = new AtomicBoolean(true) private val shutdownLatch = new CountDownLatch(1) - /* The count of elements in the delay queue that are unsatisfied */ - private [kafka] val unsatisfied = new AtomicInteger(0) - - def numRequests = delayed.size() + private val delayedQueue = new DelayQueue[T] + def delayed() = delayedQueue.size() + /** Main loop for the expiry thread */ def run() { while(running.get) { @@ -245,12 +253,17 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt expire(curr) } } - if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge - debug("Beginning purgatory purge") - val purged = purgeSatisfied() - debug("Purged %d requests from delay queue.".format(purged)) + // see if we need to purge the watch lists + if (RequestPurgatory.this.watched() >= purgeInterval) { + debug("Begin purging watch lists") val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum - debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers)) + debug("Purged %d elements from watch lists.".format(numPurgedFromWatchers)) + } + // see if we need to purge the delayed request queue + if (delayed() >= purgeInterval) { + debug("Begin purging delayed queue") + val purged = purgeSatisfied() + debug("Purged %d requests from delayed queue.".format(purged)) } } catch { case e: Exception => @@ -262,8 +275,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt /** Add a request to be expired */ def enqueue(t: T) { - delayed.add(t) - unsatisfied.incrementAndGet() + delayedQueue.add(t) } /** Shutdown the expiry thread*/ @@ -274,20 +286,16 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt debug("Shut down complete.") } - /** Record the fact that we satisfied a request in the stats for the expiry queue */ - def satisfyRequest(): Unit = unsatisfied.getAndDecrement() - /** * Get the next expired event */ private def pollExpired(): T = { while(true) { - val curr = delayed.poll(200L, TimeUnit.MILLISECONDS) + val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS) if (curr == null) return null.asInstanceOf[T] val updated = curr.satisfied.compareAndSet(false, true) if(updated) { - unsatisfied.getAndDecrement() return curr } } @@ -301,7 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInt var purged = 0 // purge the delayed queue - val iter = delayed.iterator() + val iter = delayedQueue.iterator() while(iter.hasNext) { val curr = iter.next() if(curr.satisfied.get) { http://git-wip-us.apache.org/repos/asf/kafka/blob/ffb81a58/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala index 168712d..a577f4a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala @@ -34,7 +34,7 @@ class RequestPurgatoryTest extends JUnit3Suite { override def setUp() { super.setUp() - purgatory = new MockRequestPurgatory() + purgatory = new MockRequestPurgatory(5) } override def tearDown() { @@ -73,8 +73,37 @@ class RequestPurgatoryTest extends JUnit3Suite { assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2)) assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration) } + + @Test + def testRequestPurge() { + val r1 = new DelayedRequest(Array("test1"), null, 100000L) + val r12 = new DelayedRequest(Array("test1", "test2"), null, 100000L) + val r23 = new DelayedRequest(Array("test2", "test3"), null, 100000L) + purgatory.checkAndMaybeWatch(r1) + purgatory.checkAndMaybeWatch(r12) + purgatory.checkAndMaybeWatch(r23) + + assertEquals("Purgatory should have 5 watched elements", 5, purgatory.watched()) + assertEquals("Purgatory should have 3 total delayed requests", 3, purgatory.delayed()) + + // satisfy one of the requests, it should then be purged from the watch list with purge interval 5 + r12.satisfied.set(true) + TestUtils.waitUntilTrue(() => purgatory.watched() == 3, + "Purgatory should have 3 watched elements instead of " + + purgatory.watched(), 1000L) + TestUtils.waitUntilTrue(() => purgatory.delayed() == 3, + "Purgatory should still have 3 total delayed requests instead of " + purgatory.delayed(), 1000L) + + // add two more requests, then the satisfied request should be purged from the delayed queue with purge interval 5 + purgatory.checkAndMaybeWatch(r1) + purgatory.checkAndMaybeWatch(r1) + + TestUtils.waitUntilTrue(() => purgatory.watched() == 5, + "Purgatory should have 5 watched elements instead of " + purgatory.watched(), 1000L) + TestUtils.waitUntilTrue(() => purgatory.delayed() == 4, + "Purgatory should have 4 total delayed requests instead of " + purgatory.delayed(), 1000L) + } - class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] { + class MockRequestPurgatory(purge: Int) extends RequestPurgatory[DelayedRequest](purgeInterval = purge) { val satisfied = mutable.Set[DelayedRequest]() val expired = mutable.Set[DelayedRequest]() def awaitExpiration(delayed: DelayedRequest) = {
