Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 24e1a4546 -> 83e1eb636


[GEARPUMP-317] Fix Task minClock

Author: manuzhang <[email protected]>

Closes #187 from manuzhang/watermark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/83e1eb63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/83e1eb63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/83e1eb63

Branch: refs/heads/master
Commit: 83e1eb63643274d3914470cbb60b9908b2885e8e
Parents: 24e1a45
Author: manuzhang <[email protected]>
Authored: Mon Jun 19 11:46:32 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Mon Jun 19 11:46:43 2017 +0800

----------------------------------------------------------------------
 .../streaming/dsl/task/GroupByTask.scala        |  18 +-
 .../streaming/dsl/task/TransformTask.scala      |   7 +-
 .../dsl/window/impl/WindowRunner.scala          |  46 ++--
 .../streaming/source/DataSourceProcessor.scala  |  20 +-
 .../streaming/source/DataSourceTask.scala       |   9 +-
 .../gearpump/streaming/task/Subscription.scala  |  59 ++--
 .../apache/gearpump/streaming/task/Task.scala   |   5 +
 .../gearpump/streaming/task/TaskActor.scala     | 273 +++++++++----------
 .../gearpump/streaming/task/TaskUtil.scala      |  30 ++
 .../gearpump/streaming/task/TaskWrapper.scala   |   4 +
 .../streaming/dsl/task/TransformTaskSpec.scala  |   6 +-
 .../window/impl/DefaultWindowRunnerSpec.scala   |   2 +-
 .../streaming/source/DataSourceTaskSpec.scala   |   5 +-
 .../streaming/task/SubscriptionSpec.scala       |  53 ++--
 14 files changed, 298 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
index 8301fb9..b3f3ad2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
@@ -26,7 +26,8 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import 
org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
GEARPUMP_STREAMING_OPERATOR}
 import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 /**
  * Processes messages in groups as defined by groupBy function.
@@ -61,13 +62,14 @@ class GroupByTask[IN, GROUP, OUT](
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
-      override def accept(runner: WindowRunner[IN, OUT]): Unit = {
-        runner.trigger(watermark).foreach {
-          result =>
-            taskContext.output(Message(result.value, result.timestamp))
+    if (groups.isEmpty && watermark == Watermark.MAX) {
+      taskContext.updateWatermark(Watermark.MAX)
+    } else {
+      groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
+        override def accept(runner: WindowRunner[IN, OUT]): Unit = {
+          TaskUtil.trigger(watermark, runner, taskContext)
         }
-      }
-    })
+      })
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 6a455a5..5ad64fa 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 class TransformTask[IN, OUT](
     runner: WindowRunner[IN, OUT],
@@ -41,9 +41,6 @@ class TransformTask[IN, OUT](
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    runner.trigger(watermark).foreach {
-      result =>
-        taskContext.output(Message(result.value, result.timestamp))
-    }
+    TaskUtil.trigger(watermark, runner, taskContext)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 2025618..17a9525 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -26,28 +26,34 @@ import 
com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
 import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
 import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
+import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.task.TaskUtil
 
 import scala.collection.mutable.ArrayBuffer
 
 case class TimestampedValue[T](value: T, timestamp: Instant)
 
+case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
+    watermark: Instant)
+
 trait WindowRunner[IN, OUT] extends java.io.Serializable {
 
   def process(timestampedValue: TimestampedValue[IN]): Unit
 
-  def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]]
+  def trigger(time: Instant): TriggeredOutputs[OUT]
 }
 
 case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
     right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
 
-  def process(timestampedValue: TimestampedValue[IN]): Unit = {
+  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
     left.process(timestampedValue)
   }
 
-  def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = {
-    left.trigger(time).foreach(right.process)
-    right.trigger(time)
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+    val lOutputs = left.trigger(time)
+    lOutputs.outputs.foreach(right.process)
+    right.trigger(lOutputs.watermark)
   }
 }
 
@@ -59,6 +65,7 @@ class DefaultWindowRunner[IN, OUT](
   private val windowFn = windows.windowFn
   private val windowInputs = new TreeSortedMap[Window, 
FastList[TimestampedValue[IN]]]
   private var setup = false
+  private var watermark = Watermark.MIN
 
   override def process(timestampedValue: TimestampedValue[IN]): Unit = {
     val wins = windowFn(new Context[IN] {
@@ -98,10 +105,11 @@ class DefaultWindowRunner[IN, OUT](
     }
   }
 
-  override def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] 
= {
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
     @annotation.tailrec
     def onTrigger(
-        outputs: ArrayBuffer[TimestampedValue[OUT]]): 
TraversableOnce[TimestampedValue[OUT]] = {
+        outputs: ArrayBuffer[TimestampedValue[OUT]],
+        wmk: Instant): TriggeredOutputs[OUT] = {
       if (windowInputs.notEmpty()) {
         val firstWin = windowInputs.firstKey
         if (!time.isBefore(firstWin.endTime)) {
@@ -118,25 +126,31 @@ class DefaultWindowRunner[IN, OUT](
             }
           })
           fnRunner.finish().foreach {
-            out: OUT => outputs += TimestampedValue(out, 
firstWin.endTime.minusMillis(1))
+            out: OUT =>
+              outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
           }
+          val newWmk = TaskUtil.max(wmk, firstWin.endTime)
           if (windows.accumulationMode == Discarding) {
             fnRunner.teardown()
-            setup = false
             // discarding, setup need to be called for each window
-            onTrigger(outputs)
-          } else {
-            // accumulating, setup is only called for the first window
-            onTrigger(outputs)
+            setup = false
           }
+          onTrigger(outputs, newWmk)
         } else {
-          outputs
+          // minimum of end of last triggered window and start of first 
un-triggered window
+          TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
         }
       } else {
-        outputs
+        if (time == Watermark.MAX) {
+          TriggeredOutputs(outputs, Watermark.MAX)
+        } else {
+          TriggeredOutputs(outputs, wmk)
+        }
       }
     }
 
-    onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]])
+    val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], 
watermark)
+    watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
+    TriggeredOutputs(triggeredOutputs.outputs, watermark)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
index d1cc5c8..dd4c0d3 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -20,6 +20,9 @@ package org.apache.gearpump.streaming.source
 
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
+import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows}
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
Window, WindowRunner}
 import org.apache.gearpump.streaming.{Constants, Processor}
 
 /**
@@ -43,6 +46,21 @@ object DataSourceProcessor {
       taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
     : Processor[DataSourceTask[Any, Any]] = {
     Processor[DataSourceTask[Any, Any]](parallelism, description,
-      taskConf.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, 
dataSource))
+      taskConf
+        .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)
+        .withValue[WindowRunner[Any, 
Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
+        new DefaultWindowRunner[Any, Any](
+          Windows(PerElementWindowFunction, description = "perElementWindows"),
+          new DummyRunner[Any])))
+  }
+
+
+  case object PerElementWindowFunction extends WindowFunction {
+    override def apply[T](
+        context: WindowFunction.Context[T]): Array[Window] = {
+      Array(Window(context.timestamp, context.timestamp.plusMillis(1)))
+    }
+
+    override def isNonMerging: Boolean = true
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 74b0cc2..f93c496 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 /**
  * Default Task container for 
[[org.apache.gearpump.streaming.source.DataSource]] that
@@ -56,7 +56,7 @@ class DataSourceTask[IN, OUT] private[source](
   private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
 
   override def onStart(startTime: Instant): Unit = {
-    LOG.info(s"opening data source at $startTime")
+    LOG.info(s"opening data source at ${startTime.toEpochMilli}")
     source.open(context, startTime)
 
     self ! Watermark(source.getWatermark)
@@ -73,10 +73,7 @@ class DataSourceTask[IN, OUT] private[source](
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    windowRunner.trigger(watermark).foreach {
-      result =>
-        context.output(Message(result.value, result.timestamp))
-    }
+    TaskUtil.trigger(watermark, windowRunner, context)
   }
 
   override def onStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 44ec2c6..8a6f04f 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -23,12 +23,13 @@ import com.google.common.primitives.Shorts
 import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, 
Partitioner, UnicastPartitioner}
 import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import org.apache.gearpump.streaming.LifeTime
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.Subscription._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, 
TimeStamp}
+import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
 
 /**
- * Manges the output and message clock for single downstream processor
+ * Manages the output and message clock for single downstream processor
  *
  * @param subscriber downstream processor
  * @param maxPendingMessageCount trigger flow control. Should be bigger than
@@ -39,8 +40,9 @@ class Subscription(
     appId: Int,
     executorId: Int,
     taskId: TaskId,
-    subscriber: Subscriber, sessionId: Int,
-    transport: ExpressTransport,
+    subscriber: Subscriber,
+    sessionId: Int,
+    publisher: TaskActor,
     maxPendingMessageCount: Int = MAX_PENDING_MESSAGE_COUNT,
     ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) {
 
@@ -55,10 +57,12 @@ class Subscription(
   // Don't worry if this store negative number. We will wrap the Short
   private val messageCount: Array[Short] = new Array[Short](parallelism)
   private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
-  private val candidateMinClockSince: Array[Short] = new 
Array[Short](parallelism)
+  private val processingWatermarkSince: Array[Short] = new 
Array[Short](parallelism)
 
-  private val minClockValue: Array[TimeStamp] = 
Array.fill(parallelism)(Long.MaxValue)
-  private val candidateMinClock: Array[TimeStamp] = 
Array.fill(parallelism)(Long.MaxValue)
+  private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+    Watermark.MIN.toEpochMilli)
+  private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+    Watermark.MIN.toEpochMilli)
 
   private var maxPendingCount: Short = 0
 
@@ -84,7 +88,7 @@ class Subscription(
 
   def start(): Unit = {
     val ackRequest = InitialAckRequest(taskId, sessionId)
-    transport.transport(ackRequest, allTasks: _*)
+    publisher.transport(ackRequest, allTasks: _*)
   }
 
   def sendMessage(msg: Message): Int = {
@@ -106,12 +110,9 @@ class Subscription(
       msg.timestamp.toEpochMilli)) {
 
       val targetTask = TaskId(processorId, partition)
-      transport.transport(msg, targetTask)
+      publisher.transport(msg, targetTask)
 
-      this.minClockValue(partition) = Math.min(this.minClockValue(partition),
-        msg.timestamp.toEpochMilli)
-      this.candidateMinClock(partition) =
-        Math.min(this.candidateMinClock(partition), msg.timestamp.toEpochMilli)
+      this.processingWatermark(partition) = 
publisher.getProcessingWatermark.toEpochMilli
 
       incrementMessageCount(partition, 1)
 
@@ -165,15 +166,9 @@ class Subscription(
 
     if (ack.sessionId == sessionId) {
       if (ack.actualReceivedNum == ack.seq) {
-        if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) {
-          if (ack.seq == messageCount(index)) {
-            // All messages have been acked.
-            minClockValue(index) = Long.MaxValue
-          } else {
-            minClockValue(index) = candidateMinClock(index)
-          }
-          candidateMinClock(index) = Long.MaxValue
-          candidateMinClockSince(index) = messageCount(index)
+        if ((ack.seq - processingWatermarkSince(index)).toShort >= 0) {
+          outputWatermark(index) = processingWatermark(index)
+          processingWatermarkSince(index) = messageCount(index)
         }
 
         pendingMessageCount(ack.taskId.index) = 
(messageCount(ack.taskId.index) - ack.seq).toShort
@@ -186,20 +181,24 @@ class Subscription(
     }
   }
 
-  def minClock: TimeStamp = {
-    minClockValue.min
+  def watermark: TimeStamp = {
+    outputWatermark.min
   }
 
   def allowSendingMoreMessages(): Boolean = {
     maxPendingCount < maxPendingMessageCount
   }
 
-  def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
-    minClockValue.indices.foreach { i =>
-      if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0
-        && allowSendingMoreMessages) {
+  def onStallingTime(stallingTime: TimeStamp): Unit = {
+    outputWatermark.indices.foreach { i =>
+      if (outputWatermark(i) == stallingTime &&
+        pendingMessageCount(i) > 0 &&
+        allowSendingMoreMessages) {
         sendAckRequest(i)
         sendLatencyProbe(i)
+      } else if (publisher.getProcessingWatermark == Watermark.MAX &&
+        pendingMessageCount(i) == 0) {
+        outputWatermark(i) = Watermark.MAX.toEpochMilli
       }
     }
   }
@@ -210,7 +209,7 @@ class Subscription(
     incrementMessageCount(partition, ackOnceEveryMessageCount)
     val targetTask = TaskId(processorId, partition)
     val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
-    transport.transport(ackRequest, targetTask)
+    publisher.transport(ackRequest, targetTask)
   }
 
   private def incrementMessageCount(partition: Int, count: Int): Unit = {
@@ -226,7 +225,7 @@ class Subscription(
   private def sendLatencyProbe(partition: Int): Unit = {
     val probeLatency = LatencyProbe(System.currentTimeMillis())
     val targetTask = TaskId(processorId, partition)
-    transport.transport(probeLatency, targetTask)
+    publisher.transport(probeLatency, targetTask)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index 90a8bff..dc80511 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -116,6 +116,11 @@ trait TaskContext {
   def upstreamMinClock: TimeStamp
 
   /**
+   * Update TaskActor with the processing progress (watermark)
+   */
+  def updateWatermark(watermark: Instant): Unit
+
+  /**
    * Logger is environment dependant, it should be provided by
    * containing environment.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 8ef45f3..1b90146 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -35,7 +35,7 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.task.TaskActor._
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, 
TimeStamp}
+import org.apache.gearpump.{Message, TimeStamp}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -51,16 +51,14 @@ class TaskActor(
     val task: TaskWrapper,
     inputSerializerPool: SerializationFramework)
     extends Actor with ExpressTransport with TimeOutScheduler {
-  private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS
-  private var _minClock: TimeStamp = MIN_TIME_MILLIS
-
-  def serializerPool: SerializationFramework = inputSerializerPool
-
-  private val config = context.system.settings.config
-
+  final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
+  // Clock report interval
+  final val CLOCK_REPORT_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
+  // Flush interval
+  final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
   val LOG: Logger = LogUtil.getLogger(getClass,
     app = taskContextData.appId, executor = taskContextData.executorId, task = 
taskId)
-
+  private val config = context.system.settings.config
   // Metrics
   private val metricName =
     
s"app${taskContextData.appId}.processor${taskId.processorId}.task${taskId.index}"
@@ -69,41 +67,48 @@ class TaskActor(
   private val processTime = 
Metrics(context.system).histogram(s"$metricName:processTime")
   private val sendThroughput = 
Metrics(context.system).meter(s"$metricName:sendThroughput")
   private val receiveThroughput = 
Metrics(context.system).meter(s"$metricName:receiveThroughput")
-
   private val maxPendingMessageCount = 
config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT)
   private val ackOnceEveryMessageCount = config.getInt(
     GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
-
   private val executor = context.parent
-  private var life = taskContextData.life
+  private val queue = new util.LinkedList[AnyRef]()
+  // SecurityChecker will be responsible of dropping messages from
+  // unknown sources
+  private val securityChecker = new SecurityChecker(taskId, self)
+  private val stashQueue = new util.LinkedList[MessageAndSender]()
 
   // Latency probe
 
   import context.dispatcher
-  final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
-
-  // Clock report interval
-  final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
-
-  // Flush interval
-  final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
-
-  private val queue = new util.LinkedList[AnyRef]()
-
+  private var upstreamWatermark: Instant = Watermark.MIN
+  private var processingWatermark: Instant = Watermark.MIN
+  private var watermark: Instant = Watermark.MIN
+  private var life = taskContextData.life
   private var subscriptions = List.empty[(Int, Subscription)]
-
-  // SecurityChecker will be responsible of dropping messages from
-  // unknown sources
-  private val securityChecker = new SecurityChecker(taskId, self)
   private[task] var sessionId = NONE_SESSION
 
   // Reports to appMaster with my address
   express.registerLocalActor(TaskId.toLong(taskId), self)
 
-  final def receive: Receive = null
+  final override def postStop(): Unit = {
+    onStop()
+  }
 
   task.setTaskActor(this)
 
+  def onStop(): Unit = task.onStop()
+
+  final override def preStart(): Unit = {
+    val register = RegisterTask(taskId, taskContextData.executorId, local)
+    LOG.info(s"$register")
+    executor ! register
+    context.become(waitForTaskRegistered)
+  }
+
+  def serializerPool: SerializationFramework = inputSerializerPool
+
+  final def receive: Receive = null
+
   def onStart(startTime: Instant): Unit = {
     task.onStart(startTime)
   }
@@ -112,9 +117,6 @@ class TaskActor(
 
   def onUnManagedMessage(msg: Any): Unit = 
task.receiveUnManagedMessage.apply(msg)
 
-  def onStop(): Unit = task.onStop()
-
-
   /**
    * output to a downstream by specifying a arrayIndex
    * @param arrayIndex this is not same as ProcessorId
@@ -133,84 +135,16 @@ class TaskActor(
     sendThroughput.mark(count)
   }
 
-  final override def postStop(): Unit = {
-    onStop()
-  }
-
-  final override def preStart(): Unit = {
-    val register = RegisterTask(taskId, taskContextData.executorId, local)
-    LOG.info(s"$register")
-    executor ! register
-    context.become(waitForTaskRegistered)
-  }
-
-  private def allowSendingMoreMessages(): Boolean = {
-    subscriptions.forall(_._2.allowSendingMoreMessages())
-  }
-
-  private def doHandleMessage(): Int = {
-    var done = false
-
-    var count = 0
-
-    while (allowSendingMoreMessages() && !done) {
-      val msg = queue.poll()
-      if (msg != null) {
-        msg match {
-          case SendAck(ack, targetTask) =>
-            transport(ack, targetTask)
-          case m: Message =>
-            count += 1
-            onNext(m)
-          case other =>
-            // un-managed message
-            onUnManagedMessage(other)
-        }
-      } else {
-        done = true
-      }
-    }
-
-    count
-  }
-
-  private def onStartClock(): Unit = {
-    LOG.info(s"received start, clock: $upstreamMinClock, sessionId: 
$sessionId")
-    subscriptions = taskContextData.subscribers.map { subscriber =>
-      (subscriber.processorId,
-        new Subscription(taskContextData.appId, taskContextData.executorId, 
taskId, subscriber,
-          sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount))
-    }.sortBy(_._1)
-
-    subscriptions.foreach(_._2.start())
-
-    stashQueue.asScala.foreach { item =>
-      handleMessages(item.sender).apply(item.msg)
-    }
-    stashQueue.clear()
-
-    // Put this as the last step so that the subscription is already 
initialized.
-    // Message sending in current Task before onStart will not be delivered to
-    // target
-    onStart(Instant.ofEpochMilli(_minClock))
-
-    taskContextData.appMaster ! GetUpstreamMinClock(taskId)
-    context.become(handleMessages(sender))
-  }
-
   def waitForTaskRegistered: Receive = {
     case TaskRegistered(_, id, startClock) =>
       this.sessionId = id
-      this._minClock = startClock
-      context.become(waitForStartClock)
+      context.become(waitForStartTask(startClock))
   }
 
-  private val stashQueue = new util.LinkedList[MessageAndSender]()
-
-  def waitForStartClock: Receive = {
+  def waitForStartTask(startClock: TimeStamp): Receive = {
     case start@StartTask(tid) =>
       assert(tid == this.taskId, s"$start sent to the wrong task 
${this.taskId}")
-      onStartClock()
+      onStartTask(startClock)
     case other: AnyRef =>
       stashQueue.add(MessageAndSender(other, sender()))
   }
@@ -246,7 +180,7 @@ class TaskActor(
 
     case watermark@Watermark(instant) =>
       assert(sender.eq(self), "Watermark should only be sent from Task to 
itself")
-      onUpstreamMinClock(instant.toEpochMilli)
+      onUpstreamMinClock(instant)
       receiveMessage(watermark.toMessage, sender)
 
     case UpstreamMinClock(upstreamClock) =>
@@ -256,7 +190,7 @@ class TaskActor(
       // 3. upstreamClock is None for source task since it's reported as 
watermark above
       //    by external source
       // 4. this is designed to avoid flooding the ClockService
-      upstreamClock.foreach(onUpstreamMinClock)
+      upstreamClock.foreach(clock => 
onUpstreamMinClock(Instant.ofEpochMilli(clock)))
       reportMinClock()
 
     case ChangeTask(_, dagVersion, newLife, subscribers) =>
@@ -291,14 +225,69 @@ class TaskActor(
   }
 
   /**
-   * Returns min clock of this task
-   */
-  def minClock: TimeStamp = _minClock
-
-  /**
    * Returns min clock of upstream task
    */
-  def getUpstreamMinClock: TimeStamp = upstreamMinClock
+  def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli
+
+  def getProcessingWatermark: Instant = processingWatermark
+
+  def updateWatermark(watermark: Instant): Unit = {
+    processingWatermark = TaskUtil.max(processingWatermark, watermark)
+  }
+
+  private def allowSendingMoreMessages(): Boolean = {
+    subscriptions.forall(_._2.allowSendingMoreMessages())
+  }
+
+  private def doHandleMessage(): Int = {
+    var done = false
+
+    var count = 0
+
+    while (allowSendingMoreMessages() && !done) {
+      val msg = queue.poll()
+      if (msg != null) {
+        msg match {
+          case SendAck(ack, targetTask) =>
+            transport(ack, targetTask)
+          case m: Message =>
+            count += 1
+            onNext(m)
+          case other =>
+            // un-managed message
+            onUnManagedMessage(other)
+        }
+      } else {
+        done = true
+      }
+    }
+
+    count
+  }
+
+  private def onStartTask(startClock: TimeStamp): Unit = {
+    LOG.info(s"received start, clock: $startClock, sessionId: $sessionId")
+    subscriptions = taskContextData.subscribers.map { subscriber =>
+      (subscriber.processorId,
+        new Subscription(taskContextData.appId, taskContextData.executorId, 
taskId, subscriber,
+          sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount))
+    }.sortBy(_._1)
+
+    subscriptions.foreach(_._2.start())
+
+    stashQueue.asScala.foreach { item =>
+      handleMessages(item.sender).apply(item.msg)
+    }
+    stashQueue.clear()
+
+    // Put this as the last step so that the subscription is already 
initialized.
+    // Message sending in current Task before onStart will not be delivered to
+    // target
+    onStart(Instant.ofEpochMilli(startClock))
+
+    taskContextData.appMaster ! GetUpstreamMinClock(taskId)
+    context.become(handleMessages(sender))
+  }
 
   private def receiveMessage(msg: Message, sender: ActorRef): Unit = {
     val messageAfterCheck = securityChecker.checkMessage(msg, sender)
@@ -335,44 +324,52 @@ class TaskActor(
    *                      for other tasks, the clock comes from that reported 
to ClockService
    *                      by upstream tasks
    */
-  private def onUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
-    if (upstreamClock > this.upstreamMinClock) {
-      this.upstreamMinClock = upstreamClock
-      task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
+  private def onUpstreamMinClock(upstreamClock: Instant): Unit = {
+    if (upstreamClock.isAfter(this.upstreamWatermark)) {
+      this.upstreamWatermark = upstreamClock
+      task.onWatermarkProgress(upstreamWatermark)
     }
 
-    val subMinClock = subscriptions.foldLeft(MAX_TIME_MILLIS) { (min, sub) =>
-      val subMin = sub._2.minClock
-      // A subscription is holding back the _minClock;
-      // we send AckRequest to its tasks to push _minClock forward
-      if (subMin == _minClock) {
-        sub._2.sendAckRequestOnStallingTime(_minClock)
-      }
-      Math.min(min, subMin)
-    }
+    // For Task without subscriptions, this will be Watermark.MAX
+    val subWatermark = getSubscriptionWatermark(subscriptions, watermark)
 
-    _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
+    watermark = TaskUtil.max(Instant.ofEpochMilli(life.birth),
+      TaskUtil.min(upstreamWatermark,
+        TaskUtil.min(processingWatermark, subWatermark)))
 
     // Checks whether current task is dead.
-    if (_minClock > life.death) {
+    if (watermark.toEpochMilli > life.death) {
       // There will be no more message received...
       val unRegister = UnRegisterTask(taskId, taskContextData.executorId)
       executor ! unRegister
 
-      LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: 
$life")
+      LOG.info(s"Sending $unRegister, current watermark: $watermark, life: 
$life")
     }
   }
 
   private def reportMinClock(): Unit = {
-    val update = UpdateClock(taskId, _minClock)
+    val update = UpdateClock(taskId, watermark.toEpochMilli)
     context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
       taskContextData.appMaster ! update
     }
   }
+
+  private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: 
Instant): Instant = {
+    Instant.ofEpochMilli(subs.foldLeft(Watermark.MAX.toEpochMilli) {
+      case (min, (_, sub)) =>
+        val subWmk = sub.watermark
+        if (subWmk == wmk.toEpochMilli) {
+          sub.onStallingTime(subWmk)
+        }
+        Math.min(min, subWmk)
+    })
+  }
 }
 
 object TaskActor {
 
+  val NONE_SESSION: Int = -1
+
   // If the message comes from an unknown source, securityChecker will drop it
   class SecurityChecker(task_id: TaskId, self: ActorRef) {
 
@@ -381,15 +378,6 @@ object TaskActor {
     // Uses mutable HashMap for performance optimization
     private val receivedMsgCount = new IntShortHashMap()
 
-    // Tricky performance optimization to save memory.
-    // We store the session Id in the uid of ActorPath
-    // ActorPath.hashCode is same as uid.
-    private def getSessionId(actor: ActorRef): Int = {
-      // TODO: As method uid is protected in [akka] package. We
-      // are using hashCode instead of uid.
-      actor.hashCode()
-    }
-
     def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = {
       LOG.debug(s"Handle InitialAckRequest for session $ackRequest")
       val sessionId = ackRequest.sessionId
@@ -431,13 +419,20 @@ object TaskActor {
         }
       }
     }
+
+    // Tricky performance optimization to save memory.
+    // We store the session Id in the uid of ActorPath
+    // ActorPath.hashCode is same as uid.
+    private def getSessionId(actor: ActorRef): Int = {
+      // TODO: As method uid is protected in [akka] package. We
+      // are using hashCode instead of uid.
+      actor.hashCode()
+    }
   }
 
   case class SendAck(ack: Ack, targetTask: TaskId)
 
-  case object FLUSH
-
-  val NONE_SESSION: Int = -1
-
   case class MessageAndSender(msg: AnyRef, sender: ActorRef)
+
+  case object FLUSH
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
index 7459c64..bd889c4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
@@ -18,6 +18,11 @@
 
 package org.apache.gearpump.streaming.task
 
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+
 object TaskUtil {
 
   /**
@@ -30,4 +35,29 @@ object TaskUtil {
     val loader = Thread.currentThread().getContextClassLoader()
     loader.loadClass(className).asSubclass(classOf[Task])
   }
+
+  def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT],
+      context: TaskContext): Unit = {
+    val triggeredOutputs = runner.trigger(watermark)
+    context.updateWatermark(triggeredOutputs.watermark)
+    triggeredOutputs.outputs.foreach { case TimestampedValue(v, t) =>
+      context.output(Message(v, t))
+    }
+  }
+
+  /**
+   * @return t1 if t1 is not larger than t2 and t2 otherwise
+   */
+  def min(t1: Instant, t2: Instant): Instant = {
+    if (t1.isAfter(t2)) t2
+    else t1
+  }
+
+  /**
+   * @return t1 if t1 is not smaller than t2 and t2 otherwise
+   */
+  def max(t1: Instant, t2: Instant): Instant = {
+    if (t2.isBefore(t1)) t1
+    else t2
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index f5f099c..82cae96 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -111,6 +111,10 @@ class TaskWrapper(
     actor.getUpstreamMinClock
   }
 
+  override def updateWatermark(watermark: Instant): Unit = {
+    actor.updateWatermark(watermark)
+  }
+
   def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => 
Unit): Cancellable = {
     val dispatcher = actor.context.system.dispatcher
     actor.context.system.scheduler.schedule(initialDelay, 
interval)(f)(dispatcher)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 6b66f01..0bb4d6a 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,7 +22,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, WindowRunner}
 import org.mockito.{Matchers => MockitoMatchers}
 import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
@@ -48,9 +48,11 @@ class TransformTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
       task.onNext(message)
       verify(windowRunner).process(TimestampedValue(value, time))
 
-      
when(windowRunner.trigger(watermark)).thenReturn(Some(TimestampedValue(value, 
time)))
+      when(windowRunner.trigger(watermark)).thenReturn(
+        TriggeredOutputs(Some(TimestampedValue(value, time)), watermark))
       task.onWatermarkProgress(watermark)
       verify(context).output(message)
+      verify(context).updateWatermark(watermark)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index 98e9919..b23d0ee 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -50,7 +50,7 @@ class DefaultWindowRunnerSpec extends PropSpec with 
PropertyChecks
       new FoldRunner[KV, Option[KV]](reduce, "reduce"))
 
     data.foreach(m => 
windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
-    windowRunner.trigger(Watermark.MAX).toList shouldBe
+    windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe
       List(
         TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),
         TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(18)),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index f7a3a63..d62739a 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, WindowRunner}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -63,12 +63,13 @@ class DataSourceTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
         when(dataSource.read()).thenReturn(msg)
 
         when(runner.trigger(Watermark.MAX)).thenReturn(
-          Some(TimestampedValue(str.asInstanceOf[Any], timestamp)))
+          TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], 
timestamp)), Watermark.MAX))
 
         sourceTask.onNext(Message("next"))
         sourceTask.onWatermarkProgress(Watermark.MAX)
 
         verify(taskContext).output(msg)
+        verify(taskContext).updateWatermark(Watermark.MAX)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83e1eb63/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 65cb17a..285bf44 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.Message
+import org.apache.gearpump.{MIN_TIME_MILLIS, Message}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.source.Watermark
@@ -47,40 +47,42 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
   val subscriber = Subscriber(downstreamProcessorId, partitioner, 
downstreamProcessor.parallelism,
     downstreamProcessor.life)
 
-  private def prepare: (Subscription, ExpressTransport) = {
-    val transport = mock[ExpressTransport]
-    val subscription = new Subscription(appId, executorId, taskId, subscriber, 
session, transport)
+  private def prepare: (Subscription, TaskActor) = {
+    val sender = mock[TaskActor]
+
+    val subscription = new Subscription(appId, executorId, taskId, subscriber, 
session, sender)
     subscription.start()
 
     val expectedAckRequest = InitialAckRequest(taskId, session)
-    verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), 
TaskId(1, 1))
+    verify(sender, times(1)).transport(expectedAckRequest, TaskId(1, 0), 
TaskId(1, 1))
 
-    (subscription, transport)
+    (subscription, sender)
   }
 
   it should "not send any more message when its life ends" in {
-    val (subscription, transport) = prepare
+    val (subscription, _) = prepare
     subscription.changeLife(LifeTime(0, 0))
     val count = subscription.sendMessage(Message("some"))
     assert(count == 0)
   }
 
   it should "send message and handle ack correctly" in {
-    val (subscription, transport) = prepare
+    val (subscription, sender) = prepare
     val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
+    when(sender.getProcessingWatermark).thenReturn(msg1.timestamp)
     subscription.sendMessage(msg1)
 
-    verify(transport, times(1)).transport(msg1, TaskId(1, 1))
-    assert(subscription.minClock == 70)
+    verify(sender, times(1)).transport(msg1, TaskId(1, 1))
+    assert(subscription.watermark == MIN_TIME_MILLIS)
 
     val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
+    when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
     subscription.sendMessage(msg2)
-    verify(transport, times(1)).transport(msg2, TaskId(1, 0))
 
-    // minClock has been set to smaller one
-    assert(subscription.minClock == 50)
+    verify(sender, times(1)).transport(msg2, TaskId(1, 0))
+    assert(subscription.watermark == MIN_TIME_MILLIS)
 
-    val initialMinClock = subscription.minClock
+    val initialMinClock = subscription.watermark
 
     // Acks initial AckRequest(0)
     subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
@@ -88,46 +90,39 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
 
     // Sends 100 messages
     100 until 200 foreach { clock =>
+      
when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(clock),
+        Instant.ofEpochMilli(clock))
       subscription.sendMessage(Message("1", clock))
       subscription.sendMessage(Message("2", clock))
     }
 
-    // Ack not received, minClock no change
-    assert(subscription.minClock == initialMinClock)
+    assert(subscription.watermark == 50)
 
     subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
     subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
 
     // Ack received, minClock changed
-    assert(subscription.minClock > initialMinClock)
+    assert(subscription.watermark > initialMinClock)
 
     // Expects to receive two ackRequest for two downstream tasks
     val ackRequestForTask0 = AckRequest(taskId, 200, session)
-    verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
+    verify(sender, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
 
     val ackRequestForTask1 = AckRequest(taskId, 200, session)
-    verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
+    verify(sender, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
   }
 
   it should "disallow more message sending if there is no ack back" in {
-    val (subscription, transport) = prepare
+    val (subscription, sender) = prepare
     // send 100 messages
     0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock =>
+      when(sender.getProcessingWatermark).thenReturn(Watermark.MAX)
       subscription.sendMessage(Message(randomMessage, clock))
     }
 
     assert(!subscription.allowSendingMoreMessages())
   }
 
-  it should "report minClock as Long.MaxValue when there is no pending 
message" in {
-    val (subscription, _) = prepare
-    val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
-    subscription.sendMessage(msg1)
-    assert(subscription.minClock == 70)
-    subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
-    assert(subscription.minClock == Watermark.MAX.toEpochMilli)
-  }
-
   private def randomMessage: String = new Random().nextInt.toString
 }
 


Reply via email to