Repository: incubator-gearpump Updated Branches: refs/heads/master ae55efbdd -> 0bc65218f
[GEARPUMP-291] Refactor TaskActor Author: manuzhang <[email protected]> Closes #170 from manuzhang/refactor_task. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/0bc65218 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/0bc65218 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/0bc65218 Branch: refs/heads/master Commit: 0bc65218f39962cf237d4b56b95c06ce2536ea1c Parents: ae55efb Author: manuzhang <[email protected]> Authored: Fri Mar 10 23:02:18 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Mar 10 23:06:28 2017 +0800 ---------------------------------------------------------------------- .../streaming/appmaster/ClockService.scala | 14 ++- .../gearpump/streaming/task/TaskActor.scala | 113 +++++++++++-------- .../streaming/task/TaskControlMessage.scala | 4 +- .../streaming/appmaster/AppMasterSpec.scala | 7 +- .../streaming/appmaster/ClockServiceSpec.scala | 5 +- 5 files changed, 88 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index b1f0b23..77a966a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -37,7 +37,6 @@ import org.apache.gearpump.streaming.task._ import org.apache.gearpump.util.LogUtil import org.slf4j.Logger -import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.language.implicitConversions @@ -194,7 +193,7 @@ class ClockService( private def getUpstreamClocks( clocks: Map[ProcessorId, ProcessorClock]): Map[ProcessorId, Array[ProcessorClock]] = { clocks.foldLeft(Map.empty[ProcessorId, Array[ProcessorClock]]) { - case (accum, (processorId, clock)) => + case (accum, (processorId, _)) => val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1) if (upstreams.nonEmpty) { val upstreamClocks = upstreams.collect(clocks) @@ -215,7 +214,7 @@ class ClockService( def clockService: Receive = { case GetUpstreamMinClock(task) => - getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_)) + sendBackUpstreamMinClock(sender, task) case UpdateClock(task, clock) => val processorClock = clocks.get(task.processorId) @@ -225,10 +224,9 @@ class ClockService( LOG.error(s"Cannot updateClock for task $task") } if (Instant.ofEpochMilli(minClock).equals(Watermark.MAX)) { - healthCheckScheduler.cancel() appMaster ! EndingClock } else { - getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_)) + sendBackUpstreamMinClock(sender, task) } case GetLatestMinClock => @@ -263,7 +261,7 @@ class ClockService( case GetCheckpointClock => sender ! CheckpointClock(minCheckpointClock) - case getStalling: GetStallingTasks => + case GetStallingTasks => sender ! StallingTasks(healthChecker.getReport.stallingTasks) case ChangeToNewDAG(newDag) => @@ -282,6 +280,10 @@ class ClockService( }) } + private def sendBackUpstreamMinClock(sender: ActorRef, task: TaskId): Unit = { + sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId)) + } + private def removeProcessor(processorId: ProcessorId): Unit = { clocks = clocks - processorId processorClocks = processorClocks.filter(_.processorId != processorId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 318ebf8..10648b4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -30,14 +30,19 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.metrics.Metrics import org.apache.gearpump.serializer.SerializationFramework import org.apache.gearpump.streaming.AppMasterToExecutor._ +import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.streaming.task.TaskActor._ import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp} +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + + /** - * - * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task. + * All tasks of Gearpump run inside an Actor. TaskActor is the Actor container for a task. */ class TaskActor( val taskId: TaskId, @@ -48,20 +53,17 @@ class TaskActor( extends Actor with ExpressTransport with TimeOutScheduler { private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS private var _minClock: TimeStamp = MIN_TIME_MILLIS - private var minClockReported: Boolean = true def serializerPool: SerializationFramework = inputSerializerPool - import taskContextData._ - - import org.apache.gearpump.streaming.Constants._ - import org.apache.gearpump.streaming.task.TaskActor._ - val config = context.system.settings.config + private val config = context.system.settings.config - val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId) + val LOG: Logger = LogUtil.getLogger(getClass, + app = taskContextData.appId, executor = taskContextData.executorId, task = taskId) // Metrics - private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}" + private val metricName = + s"app${taskContextData.appId}.processor${taskId.processorId}.task${taskId.index}" private val receiveLatency = Metrics(context.system).histogram( s"$metricName:receiveLatency", sampleRate = 1) private val processTime = Metrics(context.system).histogram(s"$metricName:processTime") @@ -76,7 +78,6 @@ class TaskActor( private var life = taskContextData.life // Latency probe - import scala.concurrent.duration._ import context.dispatcher final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS) @@ -137,7 +138,7 @@ class TaskActor( } final override def preStart(): Unit = { - val register = RegisterTask(taskId, executorId, local) + val register = RegisterTask(taskId, taskContextData.executorId, local) LOG.info(s"$register") executor ! register context.become(waitForTaskRegistered) @@ -175,15 +176,14 @@ class TaskActor( private def onStartClock(): Unit = { LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId") - subscriptions = subscribers.map { subscriber => + subscriptions = taskContextData.subscribers.map { subscriber => (subscriber.processorId, - new Subscription(appId, executorId, taskId, subscriber, sessionId, this, - maxPendingMessageCount, ackOnceEveryMessageCount)) + new Subscription(taskContextData.appId, taskContextData.executorId, taskId, subscriber, + sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)) }.sortBy(_._1) subscriptions.foreach(_._2.start()) - import scala.collection.JavaConverters._ stashQueue.asScala.foreach { item => handleMessages(item.sender).apply(item.msg) } @@ -194,13 +194,13 @@ class TaskActor( // target onStart(Instant.ofEpochMilli(_minClock)) - appMaster ! GetUpstreamMinClock(taskId) + taskContextData.appMaster ! GetUpstreamMinClock(taskId) context.become(handleMessages(sender)) } def waitForTaskRegistered: Receive = { - case start@TaskRegistered(_, sessionId, startClock) => - this.sessionId = sessionId + case TaskRegistered(_, id, startClock) => + this.sessionId = id this._minClock = startClock context.become(waitForStartClock) } @@ -208,7 +208,8 @@ class TaskActor( private val stashQueue = new util.LinkedList[MessageAndSender]() def waitForStartClock: Receive = { - case start: StartTask => + case start@StartTask(tid) => + assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}") onStartClock() case other: AnyRef => stashQueue.add(MessageAndSender(other, sender())) @@ -221,6 +222,7 @@ class TaskActor( queue.add(SendAck(ackResponse, ackRequest.taskId)) doHandleMessage() } + case ackRequest: AckRequest => // Enqueue to handle the ackRequest and send back ack later val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, @@ -229,36 +231,45 @@ class TaskActor( queue.add(SendAck(ackResponse, ackRequest.taskId)) doHandleMessage() } + case ack: Ack => subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack)) doHandleMessage() + case inputMessage: SerializedMessage => val message = Message(serializerPool.get().deserialize(inputMessage.bytes), inputMessage.timeStamp) receiveMessage(message, sender) + case inputMessage: Message => receiveMessage(inputMessage, sender) - case watermark@Watermark(instant) => - if (self.eq(sender) && minClockReported) { - updateUpstreamMinClock(instant.toEpochMilli) - minClockReported = false - } + case watermark@Watermark(instant) => + assert(sender.eq(self), "Watermark should only be sent from Task to itself") + onUpstreamMinClock(instant.toEpochMilli) receiveMessage(watermark.toMessage, sender) case UpstreamMinClock(upstreamClock) => - updateUpstreamMinClock(upstreamClock) - - case ChangeTask(_, dagVersion, life, subscribers) => - this.life = life + // 1. received from ClockService and report minClock back after CLOCK_REPORT_INTERVAL + // then ClockService will send another update and loop + // 2. The loop is kicked off by GetUpstreamMinClock on start + // 3. upstreamClock is None for source task since it's reported as watermark above + // by external source + // 4. this is designed to avoid flooding the ClockService + upstreamClock.foreach(onUpstreamMinClock) + reportMinClock() + + case ChangeTask(_, dagVersion, newLife, subscribers) => + this.life = newLife subscribers.foreach { subscriber => val processorId = subscriber.processorId val subscription = getSubscription(processorId) subscription match { - case Some(subscription) => - subscription.changeLife(subscriber.lifeTime cross this.life) + case Some(subs) => + subs.changeLife(subscriber.lifeTime cross this.life) case None => - val subscription = new Subscription(appId, executorId, taskId, subscriber, + val subscription = new Subscription(taskContextData.appId, + taskContextData.executorId, taskId, subscriber, sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount) subscription.start() subscriptions :+=(subscriber.processorId, subscription) @@ -267,11 +278,13 @@ class TaskActor( } } sender ! TaskChanged(taskId, dagVersion) + case LatencyProbe(timeStamp) => receiveLatency.update(System.currentTimeMillis() - timeStamp) - case send: SendMessageLoss => - LOG.info("received SendMessageLoss") + + case SendMessageLoss => throw new MsgLostException + case other: AnyRef => queue.add(other) doHandleMessage() @@ -310,7 +323,19 @@ class TaskActor( subscriptions.find(_._1 == processorId).map(_._2) } - private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = { + /** + * On receiving upstream min clock, this task will + * + * 1. update its upstreamMinClock and trigger watermark progress method + * if the new value is larger + * 2. update its own min clock + * 3. check for its own lifetime + * + * @param upstreamClock for DataSourceTask the clock comes from itself by DataSource.getWatermark + * for other tasks, the clock comes from that reported to ClockService + * by upstream tasks + */ + private def onUpstreamMinClock(upstreamClock: TimeStamp): Unit = { if (upstreamClock > this.upstreamMinClock) { this.upstreamMinClock = upstreamClock task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock)) @@ -328,22 +353,22 @@ class TaskActor( _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock)) - val update = UpdateClock(taskId, _minClock) - context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { - appMaster ! update - minClockReported = true - } - - // Checks whether current task is dead. if (_minClock > life.death) { // There will be no more message received... - val unRegister = UnRegisterTask(taskId, executorId) + val unRegister = UnRegisterTask(taskId, taskContextData.executorId) executor ! unRegister LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life") } } + + private def reportMinClock(): Unit = { + val update = UpdateClock(taskId, _minClock) + context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) { + taskContextData.appMaster ! update + } + } } object TaskActor { @@ -412,7 +437,7 @@ object TaskActor { case object FLUSH - val NONE_SESSION = -1 + val NONE_SESSION: Int = -1 case class MessageAndSender(msg: AnyRef, sender: ActorRef) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala index 73cd5af..c2e2faa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -54,7 +54,7 @@ case object GetCheckpointClock extends ClockEvent case class CheckpointClock(clock: Option[TimeStamp]) -case class UpstreamMinClock(latestMinClock: TimeStamp) +case class UpstreamMinClock(latestMinClock: Option[TimeStamp]) case class LatestMinClock(clock: TimeStamp) @@ -67,7 +67,7 @@ case object EndingClock /** Probe the latency between two upstream to downstream tasks. */ case class LatencyProbe(timestamp: Long) -case class SendMessageLoss() +case object SendMessageLoss case object GetDAG http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index c1791aa..4faa058 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -154,6 +154,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with // clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0 appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref) + mockTask.expectMsg(UpstreamMinClock(None)) // check min clock appMaster.tell(GetLatestMinClock, mockTask.ref) @@ -161,6 +162,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0 appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref) + mockTask.expectMsg(UpstreamMinClock(None)) // check min clock appMaster.tell(GetLatestMinClock, mockTask.ref) @@ -170,7 +172,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref) // Min clock of processor 0 (Task(0, 0) and Task(0, 1)) - mockTask.expectMsg(UpstreamMinClock(1)) + mockTask.expectMsg(UpstreamMinClock(Some(1))) // check min clock appMaster.tell(GetLatestMinClock, mockTask.ref) @@ -180,7 +182,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref) // min clock of processor 0 (Task(0, 0) and Task(0, 1)) - mockTask.expectMsg(UpstreamMinClock(1)) + mockTask.expectMsg(UpstreamMinClock(Some(1))) // check min clock appMaster.tell(GetLatestMinClock, mockTask.ref) @@ -228,6 +230,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with for (i <- 1 to 5) { val taskId = TaskId(0, 0) appMaster.tell(UpdateClock(taskId, i), mockTask.ref) + mockTask.expectMsgType[UpstreamMinClock] val cause = s"message loss $i from $taskId" appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala index 4b824e0..ae78980 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala @@ -53,11 +53,13 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli val startClock = 100L store.put(ClockService.START_CLOCK, startClock) val clockService = system.actorOf(Props(new ClockService(dag, appMaster, store))) + clockService ! GetLatestMinClock expectMsg(LatestMinClock(startClock)) // task(0,0): clock(101); task(1,0): clock(100) clockService ! UpdateClock(TaskId(0, 0), 101) + expectMsg(UpstreamMinClock(None)) // Min clock is updated clockService ! GetLatestMinClock @@ -67,7 +69,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli clockService ! UpdateClock(TaskId(1, 0), 101) // Upstream is Task(0, 0), 101 - expectMsg(UpstreamMinClock(101)) + expectMsg(UpstreamMinClock(Some(101))) // Min clock is updated clockService ! GetLatestMinClock @@ -121,6 +123,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli clockService ! UpdateClock(TaskId(0, 0), 200L) clockService ! UpdateClock(TaskId(1, 0), 200L) expectMsgType[UpstreamMinClock] + expectMsgType[UpstreamMinClock] clockService ! GetStartClock expectMsg(StartClock(200L))
