Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c27cd808c -> ce3b82251


[GEARPUMP-268] Accept java.time.Instant in Message

Author: manuzhang <[email protected]>

Closes #142 from manuzhang/instant.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ce3b8225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ce3b8225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ce3b8225

Branch: refs/heads/master
Commit: ce3b822513ef67d485d41fd5817e0cbeca981e03
Parents: c27cd80
Author: manuzhang <[email protected]>
Authored: Thu Feb 9 20:26:43 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Feb 9 20:26:59 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/Message.scala     | 47 ++++++++++++++++++--
 .../cluster/client/RunningApplication.scala     |  2 +-
 .../state/processor/CountProcessor.scala        |  2 +-
 .../processor/NumberGeneratorProcessor.scala    |  2 +-
 .../processor/WindowAverageProcessor.scala      |  2 +-
 .../examples/wordcountjava/dsl/WordCount.java   |  2 +-
 .../streaming/examples/wordcount/Split.scala    |  4 +-
 .../wordcount/dsl/WindowedWordCount.scala       | 14 +++---
 .../akkastream/task/DelayInitialTask.scala      |  2 +-
 .../akkastream/task/DropWithinTask.scala        |  2 +-
 .../akkastream/task/SingleSourceTask.scala      |  6 ---
 .../akkastream/task/SourceBridgeTask.scala      |  2 +-
 .../akkastream/task/StatefulMapConcatTask.scala |  2 +-
 .../akkastream/task/TakeWithinTask.scala        |  2 +-
 .../akkastream/task/TickSourceTask.scala        |  2 +-
 .../storm/topology/GearpumpStormComponent.scala |  2 +-
 .../storm/util/StormOutputCollector.scala       |  8 ++--
 .../hadoop/HadoopCheckpointStore.scala          |  5 ++-
 .../lib/format/DefaultSequenceFormatter.scala   |  2 +-
 .../hadoop/lib/rotation/FileSizeRotation.scala  |  4 +-
 .../hadoop/lib/rotation/Rotation.scala          |  4 +-
 .../lib/rotation/FileSizeRotationSpec.scala     |  7 +--
 .../kafka/lib/source/AbstractKafkaSource.scala  |  5 ++-
 .../lib/source/DefaultKafkaMessageDecoder.scala |  2 +-
 .../source/DefaultKafkaMessageDecoderSpec.scala |  6 +--
 .../streaming/dsl/scalaapi/StreamApp.scala      |  2 +-
 .../streaming/dsl/task/TransformTask.scala      |  4 +-
 .../streaming/dsl/window/impl/Window.scala      |  2 +-
 .../dsl/window/impl/WindowRunner.scala          |  2 +-
 .../gearpump/streaming/source/Watermark.scala   |  2 +-
 .../streaming/state/api/PersistentTask.scala    |  2 +-
 .../streaming/task/ExpressTransport.scala       |  2 +-
 .../gearpump/streaming/task/Subscription.scala  |  7 +--
 .../streaming/task/SubscriptionSpec.scala       |  8 ++--
 34 files changed, 104 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala 
b/core/src/main/scala/org/apache/gearpump/Message.scala
index 871ebe1..9965565 100644
--- a/core/src/main/scala/org/apache/gearpump/Message.scala
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -18,15 +18,56 @@
 
 package org.apache.gearpump
 
+import java.time.Instant
+
 /**
- * Each message contains a immutable timestamp.
+ * Each message contains an immutable timestamp.
  *
  * For example, if you take a picture, the time you take the picture is the
  * message's timestamp.
+ *
  * @param msg Accept any type except Null, Nothing and Unit
  */
-case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp)
+case class Message(msg: Any, timeInMillis: TimeStamp) {
+
+  /**
+   * @param msg Accept any type except Null, Nothing and Unit
+   * @param timestamp timestamp cannot be larger than 
Instant.ofEpochMilli(Long.MaxValue)
+   */
+  def this(msg: Any, timestamp: Instant) = {
+    this(msg, timestamp.toEpochMilli)
+  }
+
+  /**
+   * Instant.EPOCH is used for default timestamp
+   *
+   * @param msg Accept any type except Null, Nothing and Uni
+   */
+  def this(msg: Any) = {
+    this(msg, Instant.EPOCH)
+  }
+
+  def timestamp: Instant = {
+    Instant.ofEpochMilli(timeInMillis)
+  }
+}
 
 object Message {
-  val noTimeStamp: TimeStamp = 0L
+
+  /**
+   * Instant.EPOCH is used for default timestamp
+   *
+   * @param msg Accept any type except Null, Nothing and Uni
+   */
+  def apply(msg: Any): Message = {
+    new Message(msg)
+  }
+
+  /**
+   * @param msg Accept any type except Null, Nothing and Unit
+   * @param timestamp timestamp cannot be larger than 
Instant.ofEpochMilli(Long.MaxValue)
+   */
+  def apply(msg: Any, timestamp: Instant): Message = {
+    new Message(msg, timestamp)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
index 973e1e8..1c6c959 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.cluster.client
 
-import akka.actor.{ActorRef, ActorSystem}
+import akka.actor.ActorRef
 import akka.pattern.ask
 import akka.util.Timeout
 import org.apache.gearpump.cluster.ClientToMaster.{RegisterAppResultListener, 
ResolveAppId, ShutdownApplication}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
index 2d31eeb..4efc6e1 100644
--- 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -37,7 +37,7 @@ class CountProcessor(taskContext: TaskContext, conf: 
UserConfig)
   }
 
   override def processMessage(state: PersistentState[Int], message: Message): 
Unit = {
-    state.update(message.timestamp, 1)
+    state.update(message.timestamp.toEpochMilli, 1)
     state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), 
message.timestamp)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index e6030d6..eb927c6 100644
--- 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -36,7 +36,7 @@ class NumberGeneratorProcessor(taskContext: TaskContext, 
conf: UserConfig)
   }
 
   override def onNext(msg: Message): Unit = {
-    output(Message(num + "", num))
+    output(Message(num + "", Instant.ofEpochMilli(num)))
     num += 1
 
     import scala.concurrent.duration._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
index eea2504..8ddbedd 100644
--- 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala
@@ -49,6 +49,6 @@ class WindowAverageProcessor(taskContext: TaskContext, conf: 
UserConfig)
   override def processMessage(state: PersistentState[AveragedValue],
       message: Message): Unit = {
     val value = AveragedValue(message.msg.asInstanceOf[String].toLong)
-    state.update(message.timestamp, value)
+    state.update(message.timestamp.toEpochMilli, value)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 2942861..7d8400d 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -79,7 +79,7 @@ public class WordCount {
 
     @Override
     public Message read() {
-      return Message.apply(str, Instant.now().toEpochMilli());
+      return Message.apply(str, Instant.now());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index c07e124..4bb28cd 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -43,10 +43,10 @@ class Split extends DataSource {
 
     if (item < result.size - 1) {
       item += 1
-      Message(result(item), System.currentTimeMillis())
+      Message(result(item), Instant.now())
     } else {
       item = 0
-      Message(result(item), System.currentTimeMillis())
+      Message(result(item), Instant.now())
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index e1aac4c..2aa1bb4 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -52,12 +52,12 @@ object WindowedWordCount extends AkkaApp with 
ArgumentsParser {
   private class TimedDataSource extends DataSource {
 
     private var data = List(
-      Message("foo", 1L),
-      Message("bar", 2L),
-      Message("foo", 3L),
-      Message("foo", 5L),
-      Message("bar", 7L),
-      Message("bar", 8L)
+      Message("foo", Instant.ofEpochMilli(1L)),
+      Message("bar", Instant.ofEpochMilli(2L)),
+      Message("foo", Instant.ofEpochMilli(3L)),
+      Message("foo", Instant.ofEpochMilli(5L)),
+      Message("bar", Instant.ofEpochMilli(7L)),
+      Message("bar", Instant.ofEpochMilli(8L))
     )
 
     private var watermark: Instant = Instant.ofEpochMilli(0)
@@ -66,7 +66,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser 
{
       if (data.nonEmpty) {
         val msg = data.head
         data = data.tail
-        watermark = Instant.ofEpochMilli(msg.timestamp)
+        watermark = msg.timestamp
         msg
       } else {
         null

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
index 7c335dc..ae91d1f 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -39,7 +39,7 @@ class DelayInitialTask[T](context: TaskContext, userConf : 
UserConfig)
 
   override def onStart(startTime: Instant): Unit = {
     context.scheduleOnce(delayInitial)(
-      self ! Message(DelayInitialTime, System.currentTimeMillis())
+      self ! Message(DelayInitialTime, Instant.now())
     )
   }
   override def onNext(msg : Message) : Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
index 0c54829..4c19de5 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -38,7 +38,7 @@ class DropWithinTask[T](context: TaskContext, userConf : 
UserConfig)
 
   override def onStart(startTime: Instant): Unit = {
     context.scheduleOnce(timeout)(
-      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+      self ! Message(DropWithinTimeout, Instant.now())
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
index 458bb4e..5bea47e 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -18,16 +18,10 @@
 
 package org.apache.gearpump.akkastream.task
 
-import java.time.Instant
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.task.TaskContext
 
-import scala.concurrent.duration.FiniteDuration
-
 class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
   extends GraphTask(context, userConf) {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
index 054b483..5b64a52 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -64,7 +64,7 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : 
UserConfig)
       LOG.error("the stream has error", ex)
     case AkkaStreamMessage(msg) =>
       LOG.info("we have received message from akka stream source: " + msg)
-      taskContext.output(Message(msg, System.currentTimeMillis()))
+      taskContext.output(Message(msg, Instant.now()))
     case Complete(description) =>
       LOG.info("the stream is completed: " + description)
     case msg =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
index a0674bc..b776f2c 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -40,7 +40,7 @@ class StatefulMapConcatTask[IN, OUT](context: TaskContext, 
userConf : UserConfig
     val iterator = out.iterator
     while(iterator.hasNext) {
       val nextValue = iterator.next
-      context.output(Message(nextValue, System.currentTimeMillis()))
+      context.output(Message(nextValue, Instant.now()))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
index 9559d8f..7aa4e8e 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -38,7 +38,7 @@ class TakeWithinTask[T](context: TaskContext, userConf : 
UserConfig)
 
   override def onStart(startTime: Instant): Unit = {
     context.scheduleOnce(timeout)(
-      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+      self ! Message(DropWithinTimeout, Instant.now())
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
index d99d2db..a10e138 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -40,7 +40,7 @@ class TickSourceTask[T](context: TaskContext, userConf : 
UserConfig)
 
   override def onStart(startTime: Instant): Unit = {
     context.schedule(initialDelay, interval)(
-      self ! Message(tick, System.currentTimeMillis())
+      self ! Message(tick, Instant.now())
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index a8e061c..4536277 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -217,7 +217,7 @@ object GearpumpStormComponent {
     }
 
     override def next(message: Message): Unit = {
-      val timestamp = message.timestamp
+      val timestamp = message.timestamp.toEpochMilli
       collector.setTimestamp(timestamp)
       
bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext,
         timestamp))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
index f95b840..1b239b5 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -18,16 +18,16 @@
 
 package org.apache.gearpump.experiments.storm.util
 
+import java.time.Instant
 import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => 
JList, Map => JMap}
-import scala.collection.JavaConverters._
 
+import scala.collection.JavaConverters._
 import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject}
 import backtype.storm.grouping.CustomStreamGrouping
 import backtype.storm.task.TopologyContext
 import backtype.storm.tuple.Fields
 import backtype.storm.utils.Utils
 import org.slf4j.Logger
-
 import org.apache.gearpump._
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.experiments.storm.util.StormUtil._
@@ -182,7 +182,7 @@ class StormOutputCollector(
     if (targets.containsKey(streamId)) {
       val (targetPartitions, targetStormTaskIds) = 
getTargetPartitionsFn(streamId, values)
       val tuple = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
-      taskContext.output(Message(tuple, timestamp))
+      taskContext.output(Message(tuple, Instant.ofEpochMilli(timestamp)))
       targetStormTaskIds
     } else {
       EMPTY_LIST
@@ -206,7 +206,7 @@ class StormOutputCollector(
       val partition = stormTaskIdToGearpump(id).index
       val targetPartitions = Map(target -> Array(partition))
       val tuple = new GearpumpTuple(values, stormTaskId, streamId, 
targetPartitions)
-      taskContext.output(Message(tuple, timestamp))
+      taskContext.output(Message(tuple, Instant.ofEpochMilli(timestamp)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
index a18cce6..e26a2ee 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -18,10 +18,11 @@
 
 package org.apache.gearpump.streaming.hadoop
 
+import java.time.Instant
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.slf4j.Logger
-
 import org.apache.gearpump.TimeStamp
 import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation
 import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, 
HadoopCheckpointStoreWriter}
@@ -82,7 +83,7 @@ class HadoopCheckpointStore(
 
     curWriter.foreach { w =>
       val offset = w.write(timestamp, checkpoint)
-      rotation.mark(timestamp, offset)
+      rotation.mark(Instant.ofEpochMilli(timestamp), offset)
     }
 
     if (rotation.shouldRotate) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
index 318c071..822eb5f 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.io.{LongWritable, Text, Writable}
 import org.apache.gearpump.Message
 
 class DefaultSequenceFormatter extends OutputFormatter {
-  override def getKey(message: Message): Writable = new 
LongWritable(message.timestamp)
+  override def getKey(message: Message): Writable = new 
LongWritable(message.timestamp.toEpochMilli)
 
   override def getValue(message: Message): Writable = new 
Text(message.msg.asInstanceOf[String])
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
index 72be9c3..285b8ef 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
@@ -18,13 +18,13 @@
 
 package org.apache.gearpump.streaming.hadoop.lib.rotation
 
-import org.apache.gearpump.TimeStamp
+import java.time.Instant
 
 case class FileSizeRotation(maxBytes: Long) extends Rotation {
 
   private var bytesWritten = 0L
 
-  override def mark(timestamp: TimeStamp, offset: Long): Unit = {
+  override def mark(timestamp: Instant, offset: Long): Unit = {
     bytesWritten = offset
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
index cd8c04a..20de3d7 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
@@ -18,10 +18,10 @@
 
 package org.apache.gearpump.streaming.hadoop.lib.rotation
 
-import org.apache.gearpump.TimeStamp
+import java.time.Instant
 
 trait Rotation extends Serializable {
-  def mark(timestamp: TimeStamp, offset: Long): Unit
+  def mark(timestamp: Instant, offset: Long): Unit
   def shouldRotate: Boolean
   def rotate(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
 
b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
index 4eab3c9..a469956 100644
--- 
a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ 
b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -18,10 +18,11 @@
 
 package org.apache.gearpump.streaming.hadoop.lib.rotation
 
+import java.time.Instant
+
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-
 import org.apache.gearpump.TimeStamp
 
 class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
@@ -33,9 +34,9 @@ class FileSizeRotationSpec extends PropSpec with 
PropertyChecks with Matchers {
     forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) 
=>
       val rotation = new FileSizeRotation(fileSize)
       rotation.shouldRotate shouldBe false
-      rotation.mark(timestamp, rotation.maxBytes / 2)
+      rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2)
       rotation.shouldRotate shouldBe false
-      rotation.mark(timestamp, rotation.maxBytes)
+      rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes)
       rotation.shouldRotate shouldBe true
       rotation.rotate
       rotation.shouldRotate shouldBe false

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index ba49899..a8bda50 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -130,8 +130,9 @@ abstract class AbstractKafkaSource(
     msg
   }
 
-  private def checkpointOffsets(tp: TopicAndPartition, time: TimeStamp, 
offset: Long): Unit = {
-    checkpointStores.get(tp).foreach(_.persist(time, Injection[Long, 
Array[Byte]](offset)))
+  private def checkpointOffsets(tp: TopicAndPartition, time: Instant, offset: 
Long): Unit = {
+    checkpointStores.get(tp).foreach(_.persist(time.toEpochMilli,
+      Injection[Long, Array[Byte]](offset)))
   }
 
   private def maybeSetupCheckpointStores(tps: Array[TopicAndPartition]): Unit 
= {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
index 5e13230..2b63967 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
@@ -27,7 +27,7 @@ class DefaultKafkaMessageDecoder extends KafkaMessageDecoder {
 
   override def fromBytes(key: Array[Byte], value: Array[Byte]): 
MessageAndWatermark = {
     val time = Instant.now()
-    MessageAndWatermark(Message(value, time.toEpochMilli), time)
+    MessageAndWatermark(Message(value, time), time)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
index 81b2661..9f29022 100644
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.kafka.lib.source
 
+import java.time.Instant
+
 import com.twitter.bijection.Injection
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
@@ -29,14 +31,12 @@ class DefaultKafkaMessageDecoderSpec extends PropSpec with 
PropertyChecks with M
     forAll(Gen.chooseNum[Int](0, 100), Gen.alphaStr) { (k: Int, v: String) =>
       val kbytes = Injection[Int, Array[Byte]](k)
       val vbytes = Injection[String, Array[Byte]](v)
-      val timestamp = System.currentTimeMillis()
       val msgAndWmk = decoder.fromBytes(kbytes, vbytes)
       val message = msgAndWmk.message
       val watermark = msgAndWmk.watermark
       message.msg shouldBe vbytes
       // processing time as message timestamp and watermark
-      message.timestamp shouldBe watermark.toEpochMilli
-      message.timestamp should be >= timestamp
+      message.timestamp shouldBe watermark
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index d6eed2e..52972b7 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -97,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource 
{
 
   override def read(): Message = {
     if (iterator.hasNext) {
-      Message(iterator.next(), Instant.now().toEpochMilli)
+      Message(iterator.next(), Instant.now())
     } else {
       null
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 5febeb6..c4278dd 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -43,10 +43,10 @@ class TransformTask[IN, OUT](operator: 
Option[FunctionRunner[IN, OUT]],
     operator match {
       case Some(op) =>
         op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-          taskContext.output(new Message(msg, time))
+          taskContext.output(Message(msg, time))
         }
       case None =>
-        taskContext.output(new Message(msg.msg, time))
+        taskContext.output(Message(msg.msg, time))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 3e9d8fb..e1d9cee 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -89,7 +89,7 @@ case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, 
window: Windows[T]
     val group = groupByFn(ele)
     val windows = window.windowFn(new WindowFunction.Context[T] {
       override def element: T = ele
-      override def timestamp: Instant = Instant.ofEpochMilli(message.timestamp)
+      override def timestamp: Instant = message.timestamp
     })
     windows.map(WindowAndGroup(_, group)).toList
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 7013645..1ada42c 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -128,7 +128,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
     }
 
     def emitResult(result: OUT, time: Instant): Unit = {
-      taskContext.output(Message(result, time.toEpochMilli))
+      taskContext.output(Message(result, time))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 36099c1..912bb12 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -25,5 +25,5 @@ import org.apache.gearpump.Message
  * message used by source task to report source watermark.
  */
 case class Watermark(instant: Instant) {
-  def toMessage: Message = Message("watermark", instant.toEpochMilli)
+  def toMessage: Message = Message("watermark", instant)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index 5eaed40..f9f5b33 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -79,7 +79,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
   }
 
   final override def onNext(message: Message): Unit = {
-    checkpointManager.update(message.timestamp)
+    checkpointManager.update(message.timeInMillis)
       .foreach(state.setNextCheckpointTime)
     processMessage(state, message)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
index 7629cf9..3cc1cd0 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
@@ -56,7 +56,7 @@ trait ExpressTransport {
           msg match {
             case message: Message =>
               val bytes = serializerPool.get().serialize(message.msg)
-              serializedMessage = SerializedMessage(message.timestamp, bytes)
+              serializedMessage = SerializedMessage(message.timeInMillis, 
bytes)
             case _ => serializedMessage = msg
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 4193fbf..3e68580 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -103,13 +103,14 @@ class Subscription(
 
     var count = 0
     // Only sends message whose timestamp matches the lifeTime
-    if (partition != Partitioner.UNKNOWN_PARTITION_ID && 
life.contains(msg.timestamp)) {
+    if (partition != Partitioner.UNKNOWN_PARTITION_ID && 
life.contains(msg.timeInMillis)) {
 
       val targetTask = TaskId(processorId, partition)
       transport.transport(msg, targetTask)
 
-      this.minClockValue(partition) = Math.min(this.minClockValue(partition), 
msg.timestamp)
-      this.candidateMinClock(partition) = 
Math.min(this.candidateMinClock(partition), msg.timestamp)
+      this.minClockValue(partition) = Math.min(this.minClockValue(partition), 
msg.timeInMillis)
+      this.candidateMinClock(partition) =
+        Math.min(this.candidateMinClock(partition), msg.timeInMillis)
 
       incrementMessageCount(partition, 1)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index d128ace..3635db9 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -18,12 +18,12 @@
 
 package org.apache.gearpump.streaming.task
 
+import java.time.Instant
 import java.util.Random
 
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
@@ -66,13 +66,13 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
 
   it should "send message and handle ack correctly" in {
     val (subscription, transport) = prepare
-    val msg1 = new Message("1", timestamp = 70)
+    val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
     subscription.sendMessage(msg1)
 
     verify(transport, times(1)).transport(msg1, TaskId(1, 1))
     assert(subscription.minClock == 70)
 
-    val msg2 = new Message("0", timestamp = 50)
+    val msg2 = new Message("0", timestamp = Instant.ofEpochMilli(50))
     subscription.sendMessage(msg2)
     verify(transport, times(1)).transport(msg2, TaskId(1, 0))
 
@@ -120,7 +120,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
 
   it should "report minClock as Long.MaxValue when there is no pending 
message" in {
     val (subscription, transport) = prepare
-    val msg1 = new Message("1", timestamp = 70)
+    val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
     subscription.sendMessage(msg1)
     assert(subscription.minClock == 70)
     subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))


Reply via email to