Repository: incubator-gearpump
Updated Branches:
  refs/heads/master b6f5ccd6e -> f96aca995


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 1fb61bd..fb2aaed 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.source.Watermark
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
@@ -35,7 +37,6 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.task.TaskActor._
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{Message, TimeStamp}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -141,7 +142,7 @@ class TaskActor(
       context.become(waitForStartTask(startClock))
   }
 
-  def waitForStartTask(startClock: TimeStamp): Receive = {
+  def waitForStartTask(startClock: MilliSeconds): Receive = {
     case start@StartTask(tid) =>
       assert(tid == this.taskId, s"$start sent to the wrong task 
${this.taskId}")
       onStartTask(startClock)
@@ -227,7 +228,7 @@ class TaskActor(
   /**
    * Returns min clock of upstream task
    */
-  def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli
+  def getUpstreamMinClock: MilliSeconds = upstreamWatermark.toEpochMilli
 
   def getProcessingWatermark: Instant = processingWatermark
 
@@ -265,7 +266,7 @@ class TaskActor(
     count
   }
 
-  private def onStartTask(startClock: TimeStamp): Unit = {
+  private def onStartTask(startClock: MilliSeconds): Unit = {
     LOG.info(s"received start, clock: $startClock, sessionId: $sessionId")
     subscriptions = taskContextData.subscribers.map { subscriber =>
       (subscriber.processorId,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index c2e2faa..4ba9315 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.task
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.ProcessorId
 
 /*
@@ -42,25 +42,25 @@ case class Ack(taskId: TaskId, seq: Short, 
actualReceivedNum: Short, sessionId:
 
 sealed trait ClockEvent
 
-case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
+case class UpdateClock(taskId: TaskId, time: MilliSeconds) extends ClockEvent
 
 object GetLatestMinClock extends ClockEvent
 
 case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
 
-case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends 
ClockEvent
+case class UpdateCheckpointClock(taskId: TaskId, clock: MilliSeconds) extends 
ClockEvent
 
 case object GetCheckpointClock extends ClockEvent
 
-case class CheckpointClock(clock: Option[TimeStamp])
+case class CheckpointClock(clock: Option[MilliSeconds])
 
-case class UpstreamMinClock(latestMinClock: Option[TimeStamp])
+case class UpstreamMinClock(latestMinClock: Option[MilliSeconds])
 
-case class LatestMinClock(clock: TimeStamp)
+case class LatestMinClock(clock: MilliSeconds)
 
 case object GetStartClock
 
-case class StartClock(clock: TimeStamp)
+case class StartClock(clock: MilliSeconds)
 
 case object EndingClock
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 82cae96..1e4430b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -23,10 +23,11 @@ import java.time.Instant
 import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor._
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
 
 /**
  * This provides TaskContext for user defined tasks
@@ -107,7 +108,7 @@ class TaskWrapper(
     task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
   }
 
-  override def upstreamMinClock: TimeStamp = {
+  override def upstreamMinClock: MilliSeconds = {
     actor.getUpstreamMinClock
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
index 1ef255e..8d026db 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.transaction.api
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * CheckpointStore persistently stores mapping of timestamp to checkpoint
@@ -27,9 +27,9 @@ import org.apache.gearpump.TimeStamp
  */
 trait CheckpointStore {
 
-  def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
+  def persist(timeStamp: MilliSeconds, checkpoint: Array[Byte]): Unit
 
-  def recover(timestamp: TimeStamp): Option[Array[Byte]]
+  def recover(timestamp: MilliSeconds): Option[Array[Byte]]
 
   def close(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
index 2ddca3a..856b1c5 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.transaction.api
 
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
- * TimeStampFilter filters out messages that are obsolete.
+ * MilliSecondsFilter filters out messages that are obsolete.
  */
 trait TimeStampFilter extends java.io.Serializable {
-  def filter(msg: Message, predicate: TimeStamp): Option[Message]
+  def filter(msg: Message, predicate: MilliSeconds): Option[Message]
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index c223a53..d3bd51b 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, 
ProcessorDescription, Proce
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
 
@@ -238,7 +238,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
     // Step11: Tell ClockService to update DAG.
     clockService.expectMsgType[ChangeToNewDAG]
-    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, 
TimeStamp]))
+    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, 
MilliSeconds]))
 
     // Step12: start all tasks
     import scala.concurrent.duration._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
index 9e42e85..bf50fad 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
 
 class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
@@ -34,7 +34,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
   val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L)
   property("CheckpointManager should recover from CheckpointStore") {
     forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
@@ -47,7 +47,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
   property("CheckpointManager should write checkpoint to CheckpointStore") {
     val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
     forAll(timestampGen, checkpointIntervalGen, checkpointGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: 
Array[Byte]) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long, checkpoint: 
Array[Byte]) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
@@ -70,7 +70,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
 
   property("CheckpointManager should update checkpoint time according to max 
message timestamp") {
     forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
index 4cdff95..41b0624 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
@@ -26,7 +26,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Monoid, Serializer}
 
 class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
@@ -35,7 +35,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
 
   property("NonWindowState should recover checkpointed state at given 
timestamp") {
     forAll(longGen) {
-      (timestamp: TimeStamp) =>
+      (timestamp: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
         val bytes = Array.empty[Byte]
@@ -61,7 +61,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
 
   property("NonWindowState checkpoints state") {
     forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
+      (checkpointTime: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
 
@@ -95,7 +95,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
 
   property("NonWindowState updates state") {
     forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
+      (checkpointTime: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
index d9282ae..9975f49 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
@@ -23,7 +23,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class WindowSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
@@ -32,7 +32,7 @@ class WindowSpec extends PropSpec with PropertyChecks with 
Matchers with Mockito
   val timestampGen = Gen.chooseNum[Long](0L, 1000L)
   property("Window should only slide when time passes window end") {
     forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
         window.shouldSlide shouldBe false
         window.update(timestamp)
@@ -42,7 +42,7 @@ class WindowSpec extends PropSpec with PropertyChecks with 
Matchers with Mockito
 
   property("Window should slide by one or to given timestamp") {
     forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
         window.range shouldBe(0L, windowSize)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
index 299a626..2b784bf 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
@@ -18,16 +18,15 @@
 
 package org.apache.gearpump.streaming.state.impl
 
+import org.apache.gearpump.Time.MilliSeconds
+
 import scala.collection.immutable.TreeMap
 import scala.util.Success
-
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump._
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.{Group, Serializer}
 
@@ -74,7 +73,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
   }
 
   property("WindowState checkpoints") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
+    forAll(longGen) { (checkpointTime: MilliSeconds) =>
       val window = mock[Window]
       val taskContext = MockUtil.mockTaskContext
       val group = mock[Group[AnyRef]]
@@ -120,7 +119,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
   }
 
   property("WindowState updates state") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
+    forAll(longGen) { (checkpointTime: MilliSeconds) =>
       val window = mock[Window]
       val taskContext = MockUtil.mockTaskContext
       val group = mock[Group[AnyRef]]
@@ -205,7 +204,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
 
   property("WindowState gets interval for timestamp") {
     forAll(longGen, longGen, longGen, longGen) {
-      (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, 
windowStep: Long) =>
+      (timestamp: MilliSeconds, checkpointTime: MilliSeconds, windowSize: 
Long, windowStep: Long) =>
         val windowManager = new Window(windowSize, windowStep)
         val taskContext = MockUtil.mockTaskContext
         val group = mock[Group[AnyRef]]
@@ -225,8 +224,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
         interval.endTime shouldBe nextInterval.startTime
     }
 
-    def intervalSpec(interval: Interval, timestamp: TimeStamp,
-        checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = 
{
+    def intervalSpec(interval: Interval, timestamp: MilliSeconds,
+        checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long): 
Unit = {
       interval.startTime should be <= interval.endTime
       timestamp / windowStep * windowStep should (be <= interval.startTime)
       (timestamp - windowSize) / windowStep * windowStep should (be <= 
interval.startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 285bf44..b05befa 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.source.Watermark
@@ -73,14 +73,14 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
     subscription.sendMessage(msg1)
 
     verify(sender, times(1)).transport(msg1, TaskId(1, 1))
-    assert(subscription.watermark == MIN_TIME_MILLIS)
+    assert(subscription.watermark == Time.MIN_TIME_MILLIS)
 
     val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
     when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
     subscription.sendMessage(msg2)
 
     verify(sender, times(1)).transport(msg2, TaskId(1, 0))
-    assert(subscription.watermark == MIN_TIME_MILLIS)
+    assert(subscription.watermark == Time.MIN_TIME_MILLIS)
 
     val initialMinClock = subscription.watermark
 

Reply via email to