Repository: incubator-gearpump Updated Branches: refs/heads/sql 54686e0e2 -> 1cf87bf77
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala index 2591856..cecf127 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala @@ -18,9 +18,7 @@ package org.apache.gearpump.streaming.state.impl -import org.apache.gearpump.TimeStamp -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} /** @@ -28,18 +26,18 @@ import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, Checkpoin * should not be used in real cases */ class InMemoryCheckpointStore extends CheckpointStore { - private var checkpoints = Map.empty[TimeStamp, Array[Byte]] + private var checkpoints = Map.empty[MilliSeconds, Array[Byte]] - override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = { + override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = { checkpoints += timestamp -> checkpoint } - override def recover(timestamp: TimeStamp): Option[Array[Byte]] = { + override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = { checkpoints.get(timestamp) } override def close(): Unit = { - checkpoints = Map.empty[TimeStamp, Array[Byte]] + checkpoints = Map.empty[MilliSeconds, Array[Byte]] } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala index b161713..1393f4a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.state.impl import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer} import org.apache.gearpump.streaming.state.impl.NonWindowState._ import org.apache.gearpump.util.LogUtil @@ -35,11 +35,11 @@ object NonWindowState { class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T]) extends MonoidState[T](monoid) { - override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { + override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = { serializer.deserialize(bytes).foreach(left = _) } - override def update(timestamp: TimeStamp, t: T): Unit = { + override def update(timestamp: MilliSeconds, t: T): Unit = { updateState(timestamp, t) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala index c1f647e..0318d3d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.state.impl -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds /** * Used in window applications @@ -29,10 +29,10 @@ class Window(val windowSize: Long, val windowStep: Long) { this(windowConfig.windowSize, windowConfig.windowStep) } - private var clock: TimeStamp = 0L + private var clock: MilliSeconds = 0L private var startTime = 0L - def update(clock: TimeStamp): Unit = { + def update(clock: MilliSeconds): Unit = { this.clock = clock } @@ -40,7 +40,7 @@ class Window(val windowSize: Long, val windowStep: Long) { startTime += windowStep } - def slideTo(timestamp: TimeStamp): Unit = { + def slideTo(timestamp: MilliSeconds): Unit = { startTime = timestamp / windowStep * windowStep } @@ -48,7 +48,7 @@ class Window(val windowSize: Long, val windowStep: Long) { clock >= (startTime + windowSize) } - def range: (TimeStamp, TimeStamp) = { + def range: (MilliSeconds, MilliSeconds) = { startTime -> (startTime + windowSize) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala index 348f09e..a73b6db 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala @@ -22,7 +22,7 @@ import scala.collection.immutable.TreeMap import org.slf4j.Logger -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer} import org.apache.gearpump.streaming.state.impl.WindowState._ import org.apache.gearpump.streaming.task.TaskContext @@ -31,7 +31,7 @@ import org.apache.gearpump.util.LogUtil /** * an interval is a dynamic time range that is divided by window boundary and checkpoint time */ -case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends Ordered[Interval] { +case class Interval(startTime: MilliSeconds, endTime: MilliSeconds) extends Ordered[Interval] { override def compare(that: Interval): Int = { if (startTime < that.startTime) -1 else if (startTime > that.startTime) 1 @@ -63,7 +63,7 @@ class WindowState[T](group: Group[T], private var lastCheckpointTime = 0L - override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = { + override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = { window.slideTo(timestamp) serializer.deserialize(bytes) .foreach { states => @@ -74,7 +74,7 @@ class WindowState[T](group: Group[T], } } - override def update(timestamp: TimeStamp, t: T): Unit = { + override def update(timestamp: MilliSeconds, t: T): Unit = { val (startTime, endTime) = window.range if (timestamp >= startTime && timestamp < endTime) { updateState(timestamp, t) @@ -127,7 +127,7 @@ class WindowState[T](group: Group[T], * upperBound2 = step * Nmin2 + size > t * }}} */ - private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = { + private[impl] def getInterval(timestamp: MilliSeconds, checkpointTime: MilliSeconds): Interval = { val windowSize = window.windowSize val windowStep = window.windowStep val lowerBound1 = timestamp / windowStep * windowStep @@ -147,8 +147,8 @@ class WindowState[T](group: Group[T], } } - private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp) - : Unit = { + private[impl] def updateIntervalStates(timestamp: MilliSeconds, t: T, + checkpointTime: MilliSeconds): Unit = { val interval = getInterval(timestamp, checkpointTime) intervalStates.get(interval) match { case Some(st) => @@ -158,7 +158,7 @@ class WindowState[T](group: Group[T], } } - private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp) + private[impl] def getIntervalStates(startTime: MilliSeconds, endTime: MilliSeconds) : TreeMap[Interval, T] = { intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala index 675d5cc..c3e3b14 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.task import java.io.{DataInput, DataOutput} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds -case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte]) +case class SerializedMessage(timeStamp: MilliSeconds, bytes: Array[Byte]) class SerializedMessageSerializer extends TaskMessageSerializer[SerializedMessage] { override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 8a6f04f..24f1763 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -20,13 +20,14 @@ package org.apache.gearpump.streaming.task import org.slf4j.Logger import com.google.common.primitives.Shorts +import org.apache.gearpump.{Message, Time} +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner} import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException import org.apache.gearpump.streaming.LifeTime import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.Subscription._ import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} /** * Manages the output and message clock for single downstream processor @@ -59,9 +60,9 @@ class Subscription( private val pendingMessageCount: Array[Short] = new Array[Short](parallelism) private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism) - private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)( + private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)( Watermark.MIN.toEpochMilli) - private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)( + private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)( Watermark.MIN.toEpochMilli) private var maxPendingCount: Short = 0 @@ -135,7 +136,7 @@ class Subscription( } } - private var lastFlushTime: Long = MIN_TIME_MILLIS + private var lastFlushTime: Long = Time.MIN_TIME_MILLIS private val FLUSH_INTERVAL = 5 * 1000 // ms private def needFlush: Boolean = { System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL && @@ -181,7 +182,7 @@ class Subscription( } } - def watermark: TimeStamp = { + def watermark: MilliSeconds = { outputWatermark.min } @@ -189,7 +190,7 @@ class Subscription( maxPendingCount < maxPendingMessageCount } - def onStallingTime(stallingTime: TimeStamp): Unit = { + def onStallingTime(stallingTime: MilliSeconds): Unit = { outputWatermark.indices.foreach { i => if (outputWatermark(i) == stallingTime && pendingMessageCount(i) > 0 && http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala index dc80511..b587cc7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala @@ -23,10 +23,11 @@ import java.time.Instant import scala.concurrent.duration.FiniteDuration import akka.actor.Actor.Receive import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} /** * This provides context information for a task. @@ -113,7 +114,7 @@ trait TaskContext { * * @return the min clock */ - def upstreamMinClock: TimeStamp + def upstreamMinClock: MilliSeconds /** * Update TaskActor with the processing progress (watermark) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index 1fb61bd..fb2aaed 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit import akka.actor._ import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.source.Watermark import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig @@ -35,7 +37,6 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._ import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.streaming.task.TaskActor._ import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} -import org.apache.gearpump.{Message, TimeStamp} import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -141,7 +142,7 @@ class TaskActor( context.become(waitForStartTask(startClock)) } - def waitForStartTask(startClock: TimeStamp): Receive = { + def waitForStartTask(startClock: MilliSeconds): Receive = { case start@StartTask(tid) => assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}") onStartTask(startClock) @@ -227,7 +228,7 @@ class TaskActor( /** * Returns min clock of upstream task */ - def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli + def getUpstreamMinClock: MilliSeconds = upstreamWatermark.toEpochMilli def getProcessingWatermark: Instant = processingWatermark @@ -265,7 +266,7 @@ class TaskActor( count } - private def onStartTask(startClock: TimeStamp): Unit = { + private def onStartTask(startClock: MilliSeconds): Unit = { LOG.info(s"received start, clock: $startClock, sessionId: $sessionId") subscriptions = taskContextData.subscribers.map { subscriber => (subscriber.processorId, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala index c2e2faa..4ba9315 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala @@ -18,7 +18,7 @@ package org.apache.gearpump.streaming.task -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.ProcessorId /* @@ -42,25 +42,25 @@ case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: sealed trait ClockEvent -case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent +case class UpdateClock(taskId: TaskId, time: MilliSeconds) extends ClockEvent object GetLatestMinClock extends ClockEvent case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent -case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends ClockEvent +case class UpdateCheckpointClock(taskId: TaskId, clock: MilliSeconds) extends ClockEvent case object GetCheckpointClock extends ClockEvent -case class CheckpointClock(clock: Option[TimeStamp]) +case class CheckpointClock(clock: Option[MilliSeconds]) -case class UpstreamMinClock(latestMinClock: Option[TimeStamp]) +case class UpstreamMinClock(latestMinClock: Option[MilliSeconds]) -case class LatestMinClock(clock: TimeStamp) +case class LatestMinClock(clock: MilliSeconds) case object GetStartClock -case class StartClock(clock: TimeStamp) +case class StartClock(clock: MilliSeconds) case object EndingClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala index 82cae96..1e4430b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -23,10 +23,11 @@ import java.time.Instant import scala.concurrent.duration.FiniteDuration import akka.actor.Actor._ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} -import org.slf4j.Logger +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} +import org.slf4j.Logger /** * This provides TaskContext for user defined tasks @@ -107,7 +108,7 @@ class TaskWrapper( task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler) } - override def upstreamMinClock: TimeStamp = { + override def upstreamMinClock: MilliSeconds = { actor.getUpstreamMinClock } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala index 4650ac2..8d026db 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala @@ -18,9 +18,7 @@ package org.apache.gearpump.streaming.transaction.api -import org.apache.gearpump.TimeStamp -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.Time.MilliSeconds /** * CheckpointStore persistently stores mapping of timestamp to checkpoint @@ -29,9 +27,9 @@ import org.apache.gearpump.streaming.task.TaskContext */ trait CheckpointStore { - def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit + def persist(timeStamp: MilliSeconds, checkpoint: Array[Byte]): Unit - def recover(timestamp: TimeStamp): Option[Array[Byte]] + def recover(timestamp: MilliSeconds): Option[Array[Byte]] def close(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala index 2ddca3a..856b1c5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala @@ -18,12 +18,13 @@ package org.apache.gearpump.streaming.transaction.api -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message +import org.apache.gearpump.Time.MilliSeconds /** - * TimeStampFilter filters out messages that are obsolete. + * MilliSecondsFilter filters out messages that are obsolete. */ trait TimeStampFilter extends java.io.Serializable { - def filter(msg: Message, predicate: TimeStamp): Option[Message] + def filter(msg: Message, predicate: MilliSeconds): Option[Message] } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala index c8478f6..d75a1a0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.streaming import akka.actor._ import akka.testkit.TestActorRef import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import org.apache.gearpump.cluster.appmaster.ApplicationRuntimeInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig} import org.apache.gearpump.streaming.appmaster.AppMaster http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index 4faa058..8819c0c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -17,10 +17,8 @@ */ package org.apache.gearpump.streaming.appmaster - import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.{TestActorRef, TestProbe} -import com.typesafe.config.ConfigFactory import org.apache.gearpump.cluster.AppMasterToMaster._ import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor import org.apache.gearpump.cluster.ClientToMaster.GetLastFailure @@ -40,7 +38,7 @@ import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, Ta import org.apache.gearpump.streaming.task.{TaskContext, _} import org.apache.gearpump.streaming.{DAG, Processor, StreamApplication} import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem -import org.apache.gearpump.util.{ActorUtil, Constants, Graph} +import org.apache.gearpump.util.{ActorUtil, Graph} import org.apache.gearpump.util.Graph._ import org.scalatest._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala index adde927..7020210 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -18,7 +18,6 @@ package org.apache.gearpump.streaming.appmaster - import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import org.apache.gearpump.cluster.{TestUtil, UserConfig} @@ -28,7 +27,6 @@ import org.apache.gearpump.streaming.task.{Subscriber, TaskActor} import org.apache.gearpump.streaming._ import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ -import org.scalatest.mock.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} import scala.concurrent.Await http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index c223a53..d3bd51b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, Proce import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} @@ -238,7 +238,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // Step11: Tell ClockService to update DAG. clockService.expectMsgType[ChangeToNewDAG] - clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp])) + clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, MilliSeconds])) // Step12: start all tasks import scala.concurrent.duration._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala index 0f87a1c..9e6bf59 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows import org.apache.gearpump.streaming.{Constants, MockUtil} -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner import org.apache.gearpump.streaming.source.Watermark import org.mockito.Mockito._ import org.scalacheck.Gen http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 0bb4d6a..e38c5a3 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -23,7 +23,6 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} -import org.mockito.{Matchers => MockitoMatchers} import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala index 9e42e85..bf50fad 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala @@ -25,7 +25,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.transaction.api.CheckpointStore class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -34,7 +34,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L) property("CheckpointManager should recover from CheckpointStore") { forAll(timestampGen, checkpointIntervalGen) { - (timestamp: TimeStamp, checkpointInterval: Long) => + (timestamp: MilliSeconds, checkpointInterval: Long) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) @@ -47,7 +47,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w property("CheckpointManager should write checkpoint to CheckpointStore") { val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8")) forAll(timestampGen, checkpointIntervalGen, checkpointGen) { - (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: Array[Byte]) => + (timestamp: MilliSeconds, checkpointInterval: Long, checkpoint: Array[Byte]) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) @@ -70,7 +70,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w property("CheckpointManager should update checkpoint time according to max message timestamp") { forAll(timestampGen, checkpointIntervalGen) { - (timestamp: TimeStamp, checkpointInterval: Long) => + (timestamp: MilliSeconds, checkpointInterval: Long) => val checkpointStore = mock[CheckpointStore] val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala index 4cdff95..41b0624 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala @@ -26,7 +26,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds import org.apache.gearpump.streaming.state.api.{Monoid, Serializer} class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -35,7 +35,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with property("NonWindowState should recover checkpointed state at given timestamp") { forAll(longGen) { - (timestamp: TimeStamp) => + (timestamp: MilliSeconds) => val monoid = mock[Monoid[AnyRef]] val serializer = mock[Serializer[AnyRef]] val bytes = Array.empty[Byte] @@ -61,7 +61,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with property("NonWindowState checkpoints state") { forAll(longGen) { - (checkpointTime: TimeStamp) => + (checkpointTime: MilliSeconds) => val monoid = mock[Monoid[AnyRef]] val serializer = mock[Serializer[AnyRef]] @@ -95,7 +95,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with property("NonWindowState updates state") { forAll(longGen) { - (checkpointTime: TimeStamp) => + (checkpointTime: MilliSeconds) => val monoid = mock[Monoid[AnyRef]] val serializer = mock[Serializer[AnyRef]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala index d9282ae..9975f49 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala @@ -23,7 +23,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.TimeStamp +import org.apache.gearpump.Time.MilliSeconds class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -32,7 +32,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito val timestampGen = Gen.chooseNum[Long](0L, 1000L) property("Window should only slide when time passes window end") { forAll(timestampGen, windowSizeGen, windowStepGen) { - (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => + (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) => val window = new Window(windowSize, windowStep) window.shouldSlide shouldBe false window.update(timestamp) @@ -42,7 +42,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito property("Window should slide by one or to given timestamp") { forAll(timestampGen, windowSizeGen, windowStepGen) { - (timestamp: TimeStamp, windowSize: Long, windowStep: Long) => + (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) => val window = new Window(windowSize, windowStep) window.range shouldBe(0L, windowSize) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala index 299a626..2b784bf 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala @@ -18,16 +18,15 @@ package org.apache.gearpump.streaming.state.impl +import org.apache.gearpump.Time.MilliSeconds + import scala.collection.immutable.TreeMap import scala.util.Success - import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} - -import org.apache.gearpump._ import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.state.api.{Group, Serializer} @@ -74,7 +73,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo } property("WindowState checkpoints") { - forAll(longGen) { (checkpointTime: TimeStamp) => + forAll(longGen) { (checkpointTime: MilliSeconds) => val window = mock[Window] val taskContext = MockUtil.mockTaskContext val group = mock[Group[AnyRef]] @@ -120,7 +119,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo } property("WindowState updates state") { - forAll(longGen) { (checkpointTime: TimeStamp) => + forAll(longGen) { (checkpointTime: MilliSeconds) => val window = mock[Window] val taskContext = MockUtil.mockTaskContext val group = mock[Group[AnyRef]] @@ -205,7 +204,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo property("WindowState gets interval for timestamp") { forAll(longGen, longGen, longGen, longGen) { - (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, windowStep: Long) => + (timestamp: MilliSeconds, checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long) => val windowManager = new Window(windowSize, windowStep) val taskContext = MockUtil.mockTaskContext val group = mock[Group[AnyRef]] @@ -225,8 +224,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo interval.endTime shouldBe nextInterval.startTime } - def intervalSpec(interval: Interval, timestamp: TimeStamp, - checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = { + def intervalSpec(interval: Interval, timestamp: MilliSeconds, + checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long): Unit = { interval.startTime should be <= interval.endTime timestamp / windowStep * windowStep should (be <= interval.startTime) (timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 285bf44..b05befa 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -24,7 +24,7 @@ import java.util.Random import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.{MIN_TIME_MILLIS, Message} +import org.apache.gearpump.{Message, Time} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.source.Watermark @@ -73,14 +73,14 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { subscription.sendMessage(msg1) verify(sender, times(1)).transport(msg1, TaskId(1, 1)) - assert(subscription.watermark == MIN_TIME_MILLIS) + assert(subscription.watermark == Time.MIN_TIME_MILLIS) val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50)) when(sender.getProcessingWatermark).thenReturn(msg2.timestamp) subscription.sendMessage(msg2) verify(sender, times(1)).transport(msg2, TaskId(1, 0)) - assert(subscription.watermark == MIN_TIME_MILLIS) + assert(subscription.watermark == Time.MIN_TIME_MILLIS) val initialMinClock = subscription.watermark
