Repository: incubator-gearpump Updated Branches: refs/heads/master b6f5ccd6e -> f96aca995
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 1fb61bd..fb2aaed 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit import akka.actor._ import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.source.Watermark import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig @@ -35,7 +37,6 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.streaming.task.TaskActor._ import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} -import org.apache.gearpump.{Message, TimeStamp} import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -141,7 +142,7 @@ class TaskActor( context.become(waitForStartTask(startClock)) } - def waitForStartTask(startClock: TimeStamp): Receive = { + def waitForStartTask(startClock: MilliSeconds): Receive = { case start@StartTask(tid) => assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}") onStartTask(startClock) @@ -227,7 +228,7 @@ class TaskActor( /** * Returns min clock of upstream task */ - def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli + def getUpstreamMinClock: MilliSeconds = upstreamWatermark.toEpochMilli def getProcessingWatermark: Instant = processingWatermark @@ -265,7 +266,7 @@ class TaskActor( count } - private def onStartTask(startClock: TimeStamp): Unit = { + private def onStartTask(startClock: MilliSeconds): Unit = { LOG.info(s"received start, clock: $startClock, sessionId: $sessionId") subscriptions = taskContextData.subscribers.map { subscriber => (subscriber.processorId, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala index c2e2faa..4ba9315 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.task -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.ProcessorId /* @@ -42,25 +42,25 @@ case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: sealed trait ClockEvent -case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent +case class UpdateClock(taskId: TaskId, time: MilliSeconds) extends ClockEvent object GetLatestMinClock extends ClockEvent case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent -case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends ClockEvent +case class UpdateCheckpointClock(taskId: TaskId, clock: MilliSeconds) extends ClockEvent case object GetCheckpointClock extends ClockEvent -case class CheckpointClock(clock: Option[TimeStamp]) +case class CheckpointClock(clock: Option[MilliSeconds]) -case class UpstreamMinClock(latestMinClock: Option[TimeStamp]) +case class UpstreamMinClock(latestMinClock: Option[MilliSeconds]) -case class LatestMinClock(clock: TimeStamp) +case class LatestMinClock(clock: MilliSeconds) case object GetStartClock -case class StartClock(clock: TimeStamp) +case class StartClock(clock: MilliSeconds) case object EndingClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala index 82cae96..1e4430b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -23,10 +23,11 @@ import java.time.Instant import scala.concurrent.duration.FiniteDuration import akka.actor.Actor._ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} -import org.slf4j.Logger +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} +import org.slf4j.Logger /** * This provides TaskContext for user defined tasks @@ -107,7 +108,7 @@ class TaskWrapper( task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler) } - override def upstreamMinClock: TimeStamp = { + override def upstreamMinClock: MilliSeconds = { actor.getUpstreamMinClock } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala index 1ef255e..8d026db 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.transaction.api -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * CheckpointStore persistently stores mapping of timestamp to checkpoint @@ -27,9 +27,9 @@ import org.apache.gearpump.TimeStamp */ trait CheckpointStore { - def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit + def persist(timeStamp: MilliSeconds, checkpoint: Array[Byte]): Unit - def recover(timestamp: TimeStamp): Option[Array[Byte]] + def recover(timestamp: MilliSeconds): Option[Array[Byte]] def close(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala index 2ddca3a..856b1c5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala @@ -18,12 +18,13 @@ package org.apache.gearpump.streaming.transaction.api -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds /** - * TimeStampFilter filters out messages that are obsolete. + * MilliSecondsFilter filters out messages that are obsolete. */ trait TimeStampFilter extends java.io.Serializable { - def filter(msg: Message, predicate: TimeStamp): Option[Message] + def filter(msg: Message, predicate: MilliSeconds): Option[Message] } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index c223a53..d3bd51b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, Proce import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} @@ -238,7 +238,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // Step11: Tell ClockService to update DAG. clockService.expectMsgType[ChangeToNewDAG] - clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp])) + clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, MilliSeconds])) // Step12: start all tasks import scala.concurrent.duration._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala index 9e42e85..bf50fad 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala @@ -25,7 +25,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.transaction.api.CheckpointStore class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -34,7 +34,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L) property("CheckpointManager should recover from CheckpointStore") { forAll(timestampGen, checkpointIntervalGen) { - (timestamp: TimeStamp, checkpointInterval: Long) => + (timestamp: MilliSeconds, checkpointInterval: Long) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) @@ -47,7 +47,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w property("CheckpointManager should write checkpoint to CheckpointStore") { val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8")) forAll(timestampGen, checkpointIntervalGen, checkpointGen) { - (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: Array[Byte]) => + (timestamp: MilliSeconds, checkpointInterval: Long, checkpoint: Array[Byte]) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) @@ -70,7 +70,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w property("CheckpointManager should update checkpoint time according to max message timestamp") { forAll(timestampGen, checkpointIntervalGen) { - (timestamp: TimeStamp, checkpointInterval: Long) => + (timestamp: MilliSeconds, checkpointInterval: Long) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala index 4cdff95..41b0624 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala @@ -26,7 +26,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.state.api.{Monoid, Serializer} class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -35,7 +35,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with property("NonWindowState should recover checkpointed state at given timestamp") { forAll(longGen) { - (timestamp: TimeStamp) => + (timestamp: MilliSeconds) => val monoid = mock[Monoid[AnyRef]] val serializer = mock[Serializer[AnyRef]] val bytes = Array.empty[Byte] @@ -61,7 +61,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with property("NonWindowState checkpoints state") { forAll(longGen) { - (checkpointTime: TimeStamp) => + (checkpointTime: MilliSeconds) => val monoid = mock[Monoid[AnyRef]] val serializer = mock[Serializer[AnyRef]] @@ -95,7 +95,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with property("NonWindowState updates state") { forAll(longGen) { - (checkpointTime: TimeStamp) => + (checkpointTime: MilliSeconds) => val monoid = mock[Monoid[AnyRef]] val serializer = mock[Serializer[AnyRef]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala index d9282ae..9975f49 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala @@ -23,7 +23,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -32,7 +32,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito val timestampGen = Gen.chooseNum[Long](0L, 1000L) property("Window should only slide when time passes window end") { forAll(timestampGen, windowSizeGen, windowStepGen) { - (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => + (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) => val window = new Window(windowSize, windowStep) window.shouldSlide shouldBe false window.update(timestamp) @@ -42,7 +42,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito property("Window should slide by one or to given timestamp") { forAll(timestampGen, windowSizeGen, windowStepGen) { - (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => + (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) => val window = new Window(windowSize, windowStep) window.range shouldBe(0L, windowSize) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala index 299a626..2b784bf 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala @@ -18,16 +18,15 @@ package org.apache.gearpump.streaming.state.impl +import org.apache.gearpump.Time.MilliSeconds + import scala.collection.immutable.TreeMap import scala.util.Success - import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} - -import org.apache.gearpump._ import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.state.api.{Group, Serializer} @@ -74,7 +73,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo } property("WindowState checkpoints") { - forAll(longGen) { (checkpointTime: TimeStamp) => + forAll(longGen) { (checkpointTime: MilliSeconds) => val window = mock[Window] val taskContext = MockUtil.mockTaskContext val group = mock[Group[AnyRef]] @@ -120,7 +119,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo } property("WindowState updates state") { - forAll(longGen) { (checkpointTime: TimeStamp) => + forAll(longGen) { (checkpointTime: MilliSeconds) => val window = mock[Window] val taskContext = MockUtil.mockTaskContext val group = mock[Group[AnyRef]] @@ -205,7 +204,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo property("WindowState gets interval for timestamp") { forAll(longGen, longGen, longGen, longGen) { - (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, windowStep: Long) => + (timestamp: MilliSeconds, checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long) => val windowManager = new Window(windowSize, windowStep) val taskContext = MockUtil.mockTaskContext val group = mock[Group[AnyRef]] @@ -225,8 +224,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo interval.endTime shouldBe nextInterval.startTime } - def intervalSpec(interval: Interval, timestamp: TimeStamp, - checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = { + def intervalSpec(interval: Interval, timestamp: MilliSeconds, + checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long): Unit = { interval.startTime should be <= interval.endTime timestamp / windowStep * windowStep should (be <= interval.startTime) (timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 285bf44..b05befa 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -24,7 +24,7 @@ import java.util.Random import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.{MIN_TIME_MILLIS, Message} +import org.apache.gearpump.{Message, Time} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.source.Watermark @@ -73,14 +73,14 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { subscription.sendMessage(msg1) verify(sender, times(1)).transport(msg1, TaskId(1, 1)) - assert(subscription.watermark == MIN_TIME_MILLIS) + assert(subscription.watermark == Time.MIN_TIME_MILLIS) val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50)) when(sender.getProcessingWatermark).thenReturn(msg2.timestamp) subscription.sendMessage(msg2) verify(sender, times(1)).transport(msg2, TaskId(1, 0)) - assert(subscription.watermark == MIN_TIME_MILLIS) + assert(subscription.watermark == Time.MIN_TIME_MILLIS) val initialMinClock = subscription.watermark
