Repository: kafka Updated Branches: refs/heads/trunk c1694833d -> f4d3d2865
KAFKA-3470: treat commits as member heartbeats Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]>, Guozhang Wang <[email protected]> Closes #1206 from hachikuji/KAFKA-3470 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4d3d286 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4d3d286 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4d3d286 Branch: refs/heads/trunk Commit: f4d3d2865894f1a1ade9d92ac27931fd35d16cae Parents: c169483 Author: Jason Gustafson <[email protected]> Authored: Wed Apr 13 18:09:08 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Apr 13 18:09:08 2016 -0700 ---------------------------------------------------------------------- .../kafka/coordinator/GroupCoordinator.scala | 22 +++- .../scala/kafka/server/DelayedOperation.scala | 68 ++++++----- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../main/scala/kafka/utils/timer/Timer.scala | 51 +++++++- .../scala/kafka/utils/timer/TimerTask.scala | 2 +- .../scala/kafka/utils/timer/TimerTaskList.scala | 7 +- .../scala/kafka/utils/timer/TimingWheel.scala | 2 +- .../other/kafka/TestPurgatoryPerformance.scala | 2 +- .../GroupCoordinatorResponseTest.scala | 121 +++++++++++++++++-- .../kafka/server/DelayedOperationTest.scala | 2 +- .../test/scala/unit/kafka/utils/MockTime.scala | 12 ++ .../unit/kafka/utils/timer/MockTimer.scala | 57 +++++++++ .../kafka/utils/timer/TimerTaskListTest.scala | 12 +- .../unit/kafka/utils/timer/TimerTest.scala | 29 ++--- 14 files changed, 312 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 30a3a78..fb71254 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -51,6 +51,8 @@ class GroupCoordinator(val brokerId: Int, val groupConfig: GroupConfig, val offsetConfig: OffsetConfig, val groupManager: GroupMetadataManager, + val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], + val joinPurgatory: DelayedOperationPurgatory[DelayedJoin], time: Time) extends Logging { type JoinCallback = JoinGroupResult => Unit type SyncCallback = (Array[Byte], Short) => Unit @@ -59,9 +61,6 @@ class GroupCoordinator(val brokerId: Int, private val isActive = new AtomicBoolean(false) - private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null - private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null - def offsetsTopicConfigs: Properties = { val props = new Properties props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) @@ -80,8 +79,6 @@ class GroupCoordinator(val brokerId: Int, */ def startup() { info("Starting up.") - heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) - joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) isActive.set(true) info("Startup complete.") } @@ -414,6 +411,8 @@ class GroupCoordinator(val brokerId: Int, } else if (generationId != group.generationId) { responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) } else { + val member = group.get(memberId) + completeAndScheduleNextHeartbeatExpiration(group, member) delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)) } @@ -729,6 +728,17 @@ object GroupCoordinator { zkUtils: ZkUtils, replicaManager: ReplicaManager, time: Time): GroupCoordinator = { + val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) + val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId) + apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time) + } + + def apply(config: KafkaConfig, + zkUtils: ZkUtils, + replicaManager: ReplicaManager, + heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], + joinPurgatory: DelayedOperationPurgatory[DelayedJoin], + time: Time): GroupCoordinator = { val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, @@ -741,7 +751,7 @@ object GroupCoordinator { groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs) val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time) - new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, time) + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, heartbeatPurgatory, joinPurgatory, time) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 0b53532..2205568 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -47,9 +47,7 @@ import com.yammer.metrics.core.Gauge * * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete(). */ -abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { - - override val expirationMs = delayMs + System.currentTimeMillis() +abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging { private val completed = new AtomicBoolean(false) @@ -110,19 +108,27 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging { } } +object DelayedOperationPurgatory { + + def apply[T <: DelayedOperation](purgatoryName: String, + brokerId: Int = 0, + purgeInterval: Int = 1000): DelayedOperationPurgatory[T] = { + val timer = new SystemTimer(purgatoryName) + new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval) + } + +} + /** * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations. */ -class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000) +class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, + timeoutTimer: Timer, + brokerId: Int = 0, + purgeInterval: Int = 1000, + reaperEnabled: Boolean = true) extends Logging with KafkaMetricsGroup { - // timeout timer - private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() { - def newThread(runnable: Runnable): Thread = - Utils.newThread("executor-"+purgatoryName, runnable, false) - }) - private[this] val timeoutTimer = new Timer(executor) - /* a list of operation watching keys */ private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key))) @@ -152,7 +158,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br metricsTags ) - expirationReaper.start() + if (reaperEnabled) + expirationReaper.start() /** * Check if the operation can be completed, if not watch it based on the given watch keys @@ -275,8 +282,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br * Shutdown the expire reaper thread */ def shutdown() { - expirationReaper.shutdown() - executor.shutdown() + if (reaperEnabled) + expirationReaper.shutdown() + timeoutTimer.shutdown() } /** @@ -338,6 +346,23 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br } } + def advanceClock(timeoutMs: Long) { + timeoutTimer.advanceClock(timeoutMs) + + // Trigger a purge if the number of completed but still being watched operations is larger than + // the purge threshold. That number is computed by the difference btw the estimated total number of + // operations and the number of pending delayed operations. + if (estimatedTotalOperations.get - delayed > purgeInterval) { + // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to + // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with + // a little overestimated total number of operations. + estimatedTotalOperations.getAndSet(delayed) + debug("Begin purging watch lists") + val purged = allWatchers.map(_.purgeCompleted()).sum + debug("Purged %d elements from watch lists.".format(purged)) + } + } + /** * A background reaper to expire delayed operations that have timed out */ @@ -346,20 +371,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br false) { override def doWork() { - timeoutTimer.advanceClock(200L) - - // Trigger a purge if the number of completed but still being watched operations is larger than - // the purge threshold. That number is computed by the difference btw the estimated total number of - // operations and the number of pending delayed operations. - if (estimatedTotalOperations.get - delayed > purgeInterval) { - // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to - // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with - // a little overestimated total number of operations. - estimatedTotalOperations.getAndSet(delayed) - debug("Begin purging watch lists") - val purged = allWatchers.map(_.purgeCompleted()).sum - debug("Purged %d elements from watch lists.".format(purged)) - } + advanceClock(200L) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 22657f4..9bbd29e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -122,9 +122,9 @@ class ReplicaManager(val config: KafkaConfig, private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis()) private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis()) - val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( + val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests) - val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( + val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) val leaderCount = newGauge( http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/Timer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala index bdd0e75..2d78665 100644 --- a/core/src/main/scala/kafka/utils/timer/Timer.scala +++ b/core/src/main/scala/kafka/utils/timer/Timer.scala @@ -16,14 +16,52 @@ */ package kafka.utils.timer -import java.util.concurrent.{DelayQueue, ExecutorService, TimeUnit} +import java.util.concurrent.{DelayQueue, Executors, ThreadFactory, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.threadsafe +import org.apache.kafka.common.utils.Utils + +trait Timer { + /** + * Add a new task to this executor. It will be executed after the task's delay + * (beginning from the time of submission) + * @param timerTask the task to add + */ + def add(timerTask: TimerTask): Unit + + /** + * Advance the internal clock, executing any tasks whose expiration has been + * reached within the duration of the passed timeout. + * @param timeoutMs + * @return whether or not any tasks were executed + */ + def advanceClock(timeoutMs: Long): Boolean + + /** + * Get the number of tasks pending execution + * @return the number of tasks + */ + def size: Int + + /** + * Shutdown the timer service, leaving pending tasks unexecuted + */ + def shutdown(): Unit +} @threadsafe -class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = System.currentTimeMillis) { +class SystemTimer(executorName: String, + tickMs: Long = 1, + wheelSize: Int = 20, + startMs: Long = System.currentTimeMillis) extends Timer { + + // timeout timer + private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { + def newThread(runnable: Runnable): Thread = + Utils.newThread("executor-"+executorName, runnable, false) + }) private[this] val delayQueue = new DelayQueue[TimerTaskList]() private[this] val taskCounter = new AtomicInteger(0) @@ -43,7 +81,7 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 def add(timerTask: TimerTask): Unit = { readLock.lock() try { - addTimerTaskEntry(new TimerTaskEntry(timerTask)) + addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis())) } finally { readLock.unlock() } @@ -82,6 +120,11 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20 } } - def size(): Int = taskCounter.get + def size: Int = taskCounter.get + + override def shutdown() { + taskExecutor.shutdown() + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimerTask.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala index d6b3a2e..6623854 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala @@ -18,7 +18,7 @@ package kafka.utils.timer trait TimerTask extends Runnable { - val expirationMs: Long // timestamp in millisecond + val delayMs: Long // timestamp in millisecond private[this] var timerTaskEntry: TimerTaskEntry = null http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index c4aeb5d..e862f4f 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -29,7 +29,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { // TimerTaskList forms a doubly linked cyclic list using a dummy root entry // root.next points to the head // root.prev points to the tail - private[this] val root = new TimerTaskEntry(null) + private[this] val root = new TimerTaskEntry(null, -1) root.next = root root.prev = root @@ -131,7 +131,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } -private[timer] class TimerTaskEntry(val timerTask: TimerTask) { +private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] { @volatile var list: TimerTaskList = null @@ -157,5 +157,8 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) { } } + override def compare(that: TimerTaskEntry): Int = { + this.expirationMs compare that.expirationMs + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimingWheel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala index f5b6efe..4535f3f 100644 --- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala +++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala @@ -123,7 +123,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta } def add(timerTaskEntry: TimerTaskEntry): Boolean = { - val expiration = timerTaskEntry.timerTask.expirationMs + val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // Cancelled http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala index 744be3b..ba89fc8 100644 --- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala +++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala @@ -95,7 +95,7 @@ object TestPurgatoryPerformance { val latencySamples = new LatencySamples(1000000, pct75, pct50) val intervalSamples = new IntervalSamples(1000000, requestRate) - val purgatory = new DelayedOperationPurgatory[FakeOperation]("fake purgatory") + val purgatory = DelayedOperationPurgatory[FakeOperation]("fake purgatory") val queue = new CompletionQueue() val gcNames = gcMXBeans.map(_.getName) http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index acdb660..beab1b5 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -17,14 +17,14 @@ package kafka.coordinator +import kafka.utils.timer.MockTimer import org.apache.kafka.common.record.Record import org.junit.Assert._ -import kafka.common.{OffsetAndMetadata, TopicAndPartition} +import kafka.common.OffsetAndMetadata import kafka.message.{Message, MessageSet} -import kafka.server.{ReplicaManager, KafkaConfig} +import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig} import kafka.utils._ -import org.apache.kafka.common.utils.SystemTime -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{utils, TopicPartition} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -56,6 +56,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val ConsumerMinSessionTimeout = 10 val ConsumerMaxSessionTimeout = 1000 val DefaultSessionTimeout = 500 + var timer: MockTimer = null var groupCoordinator: GroupCoordinator = null var replicaManager: ReplicaManager = null var scheduler: KafkaScheduler = null @@ -87,7 +88,14 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret) EasyMock.replay(zkUtils) - groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime) + timer = new MockTimer + + val config = KafkaConfig.fromProps(props) + + val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false) + val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false) + + groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) groupCoordinator.startup() // add the partition into the owned partition list @@ -284,6 +292,90 @@ class GroupCoordinatorResponseTest extends JUnitSuite { } @Test + def testSessionTimeout() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(replicaManager) + val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None) + EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + EasyMock.replay(replicaManager) + + timer.advanceClock(DefaultSessionTimeout + 100) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult) + } + + @Test + def testHeartbeatMaintainsSession() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val sessionTimeout = 1000 + + val joinGroupResult = joinGroup(groupId, memberId, sessionTimeout, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(replicaManager) + val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + timer.advanceClock(sessionTimeout / 2) + + EasyMock.reset(replicaManager) + var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE.code, heartbeatResult) + + timer.advanceClock(sessionTimeout / 2 + 100) + + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE.code, heartbeatResult) + } + + @Test + def testCommitMaintainsSession() { + val sessionTimeout = 1000 + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val joinGroupResult = joinGroup(groupId, memberId, sessionTimeout, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(replicaManager) + val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + timer.advanceClock(sessionTimeout / 2) + + EasyMock.reset(replicaManager) + val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE.code, commitOffsetResult(tp)) + + timer.advanceClock(sessionTimeout / 2 + 100) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE.code, heartbeatResult) + } + + @Test def testSyncGroupEmptyAssignment() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID @@ -459,7 +551,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite { // with no leader SyncGroup, the follower's request should failure with an error indicating // that it should rejoin EasyMock.reset(replicaManager) - val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId) + val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId) + + timer.advanceClock(DefaultSessionTimeout + 100) + val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2) } @@ -628,17 +723,20 @@ class GroupCoordinatorResponseTest extends JUnitSuite { @Test def testGenerationIdIncrementsOnRebalance() { - val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - - val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols) val initialGenerationId = joinGroupResult.generationId val joinGroupErrorCode = joinGroupResult.errorCode + val memberId = joinGroupResult.memberId assertEquals(1, initialGenerationId) assertEquals(Errors.NONE.code, joinGroupErrorCode) EasyMock.reset(replicaManager) - val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols) + val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]())) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(replicaManager) + val otherJoinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) val nextGenerationId = otherJoinGroupResult.generationId val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode assertEquals(2, nextGenerationId) @@ -860,6 +958,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { protocolType: String, protocols: List[(String, Array[Byte])]): JoinGroupResult = { val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols) + timer.advanceClock(10) // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala index df8d5b1..2c70137 100644 --- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala +++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala @@ -26,7 +26,7 @@ class DelayedOperationTest { @Before def setUp() { - purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") + purgatory = DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock") } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/MockTime.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala index ee65748..0858e04 100644 --- a/core/src/test/scala/unit/kafka/utils/MockTime.scala +++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala @@ -19,6 +19,8 @@ package kafka.utils import java.util.concurrent._ +import org.apache.kafka.common.utils + /** * A class used for unit testing things which depend on the Time interface. * @@ -47,3 +49,13 @@ class MockTime(@volatile private var currentMs: Long) extends Time { override def toString() = "MockTime(%d)".format(milliseconds) } + +object MockTime { + implicit def toCommonTime(time: MockTime): utils.Time = new utils.Time { + override def nanoseconds(): Long = time.nanoseconds + + override def milliseconds(): Long = time.milliseconds + + override def sleep(ms: Long): Unit = time.sleep(ms) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala new file mode 100644 index 0000000..d18a060 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.utils.timer + +import kafka.utils.MockTime + +import scala.collection.mutable + +class MockTimer extends Timer { + + val time = new MockTime + private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]() + + def add(timerTask: TimerTask) { + if (timerTask.delayMs <= 0) + timerTask.run() + else + taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds)) + } + + def advanceClock(timeoutMs: Long): Boolean = { + time.sleep(timeoutMs) + + var executed = false + val now = time.milliseconds + + while (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) { + val taskEntry = taskQueue.dequeue() + if (!taskEntry.cancelled) { + val task = taskEntry.timerTask + task.run() + executed = true + } + } + + executed + } + + def size: Int = taskQueue.size + + override def shutdown(): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala index a018dde..29c9067 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala @@ -18,11 +18,11 @@ package kafka.utils.timer import org.junit.Assert._ import java.util.concurrent.atomic._ -import org.junit.{Test, After, Before} +import org.junit.Test class TimerTaskListTest { - private class TestTask(val expirationMs: Long) extends TimerTask { + private class TestTask(val delayMs: Long) extends TimerTask { def run(): Unit = { } } @@ -42,8 +42,8 @@ class TimerTaskListTest { val list3 = new TimerTaskList(sharedCounter) val tasks = (1 to 10).map { i => - val task = new TestTask(10L) - list1.add(new TimerTaskEntry(task)) + val task = new TestTask(0L) + list1.add(new TimerTaskEntry(task, 10L)) assertEquals(i, sharedCounter.get) task }.toSeq @@ -54,7 +54,7 @@ class TimerTaskListTest { tasks.take(4).foreach { task => val prevCount = sharedCounter.get // new TimerTaskEntry(task) will remove the existing entry from the list - list2.add(new TimerTaskEntry(task)) + list2.add(new TimerTaskEntry(task, 10L)) assertEquals(prevCount, sharedCounter.get) } assertEquals(10 - 4, size(list1)) @@ -66,7 +66,7 @@ class TimerTaskListTest { tasks.drop(4).foreach { task => val prevCount = sharedCounter.get // new TimerTaskEntry(task) will remove the existing entry from the list - list3.add(new TimerTaskEntry(task)) + list3.add(new TimerTaskEntry(task, 10L)) assertEquals(prevCount, sharedCounter.get) } assertEquals(0, size(list1)) http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala index 95de378..54b73b8 100644 --- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala @@ -27,7 +27,7 @@ import scala.util.Random class TimerTest { - private class TestTask(override val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask { + private class TestTask(override val delayMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask { private[this] val completed = new AtomicBoolean(false) def run(): Unit = { if (completed.compareAndSet(false, true)) { @@ -37,32 +37,31 @@ class TimerTest { } } - private[this] var executor: ExecutorService = null + private[this] var timer: Timer = null @Before def setup() { - executor = Executors.newSingleThreadExecutor() + timer = new SystemTimer("test", tickMs = 1, wheelSize = 3) } @After def teardown(): Unit = { - executor.shutdown() - executor = null + timer.shutdown() } @Test def testAlreadyExpiredTask(): Unit = { - val startTime = System.currentTimeMillis() - val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime) val output = new ArrayBuffer[Int]() val latches = (-5 until 0).map { i => val latch = new CountDownLatch(1) - timer.add(new TestTask(startTime + i, i, latch, output)) + timer.add(new TestTask(i, i, latch, output)) latch } + timer.advanceClock(0) + latches.take(5).foreach { latch => assertEquals("already expired tasks should run immediately", true, latch.await(3, TimeUnit.SECONDS)) } @@ -72,8 +71,6 @@ class TimerTest { @Test def testTaskExpiration(): Unit = { - val startTime = System.currentTimeMillis() - val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime) val output = new ArrayBuffer[Int]() val tasks = new ArrayBuffer[TestTask]() @@ -82,27 +79,27 @@ class TimerTest { val latches = (0 until 5).map { i => val latch = new CountDownLatch(1) - tasks += new TestTask(startTime + i, i, latch, output) + tasks += new TestTask(i, i, latch, output) ids += i latch } ++ (10 until 100).map { i => val latch = new CountDownLatch(2) - tasks += new TestTask(startTime + i, i, latch, output) - tasks += new TestTask(startTime + i, i, latch, output) + tasks += new TestTask(i, i, latch, output) + tasks += new TestTask(i, i, latch, output) ids += i ids += i latch } ++ (100 until 500).map { i => val latch = new CountDownLatch(1) - tasks += new TestTask(startTime + i, i, latch, output) + tasks += new TestTask(i, i, latch, output) ids += i latch } // randomly submit requests - Random.shuffle(tasks.toSeq).foreach { task => timer.add(task) } + tasks.foreach { task => timer.add(task) } - while (timer.advanceClock(1000)) {} + while (timer.advanceClock(2000)) {} latches.foreach { latch => latch.await() }
