[GEARPUMP-217] Merge master into sql branch

Author: manuzhang <[email protected]>
Author: huafengw <[email protected]>
Author: darionyaphet <[email protected]>

Closes #218 from manuzhang/sync_master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1cf87bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1cf87bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1cf87bf7

Branch: refs/heads/sql
Commit: 1cf87bf77cac2d10614a8c3c7b0758071d4cd7cc
Parents: 54686e0
Author: manuzhang <[email protected]>
Authored: Sat Aug 19 19:57:23 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Sat Aug 19 19:57:32 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 .../scala/org/apache/gearpump/Message.scala     |   6 +-
 .../main/scala/org/apache/gearpump/Time.scala   |  34 ++++
 .../gearpump/cluster/ClusterMessage.scala       |  20 +--
 .../cluster/appmaster/ApplicationMetaData.scala |   1 -
 .../appmaster/ApplicationRuntimeInfo.scala      |  12 +-
 .../cluster/client/RuntimeEnvironment.scala     |   4 +-
 .../cluster/embedded/EmbeddedCluster.scala      |   2 -
 .../embedded/EmbeddedRuntimeEnvironment.scala   |  48 ++++++
 .../org/apache/gearpump/cluster/main/Info.scala |   3 +-
 .../org/apache/gearpump/cluster/main/Kill.scala |   3 +-
 .../gearpump/cluster/main/MainRunner.scala      |   3 +-
 .../apache/gearpump/cluster/main/Replay.scala   |   3 +-
 .../gearpump/cluster/master/AppManager.scala    |   7 +-
 .../cluster/master/AppMasterLauncher.scala      |   2 +-
 .../apache/gearpump/cluster/master/Master.scala |   1 -
 .../gearpump/cluster/scheduler/Scheduler.scala  |   4 +-
 .../gearpump/jarstore/FileDirective.scala       |   2 +-
 .../apache/gearpump/jarstore/FileServer.scala   |   4 +-
 .../scala/org/apache/gearpump/package.scala     |  29 ----
 .../scala/org/apache/gearpump/util/Graph.scala  |  20 +--
 .../gearpump/util/HistoryMetricsService.scala   |   4 +-
 .../org/apache/gearpump/util/LogUtil.scala      |   2 -
 .../scala/org/apache/gearpump/util/Util.scala   |   2 +-
 .../apache/gearpump/cluster/main/MainSpec.scala |   2 +-
 .../gearpump/util/RestartPolicySpec.scala       |   2 -
 .../DistServiceAppMasterSpec.scala              |   2 +-
 .../wordcount/dsl/WindowedWordCount.scala       |   6 +-
 .../GearpumpMaterializerSession.scala           |   5 +-
 .../gearpump/akkastream/example/Test8.scala     |   2 +-
 .../gearpump/akkastream/example/Test9.scala     |   2 +-
 .../akkastream/graph/GraphPartitioner.scala     |   2 +-
 .../gearpump/akkastream/graph/SubGraph.scala    |   1 -
 .../materializer/RemoteMaterializerImpl.scala   |   4 +-
 .../experiments/storm/main/GearpumpNimbus.scala |   2 +-
 .../producer/StormSpoutOutputCollector.scala    |   8 +-
 .../storm/topology/GearpumpStormComponent.scala |   5 +-
 .../storm/topology/GearpumpTuple.scala          |   6 +-
 .../storm/util/StormOutputCollector.scala       |   9 +-
 .../StormBoltOutputCollectorSpec.scala          |   2 -
 .../StormSpoutOutputCollectorSpec.scala         |   2 -
 .../topology/GearpumpStormComponentSpec.scala   |   5 +-
 .../storm/topology/GearpumpTupleSpec.scala      |   4 +-
 .../storm/util/StormOutputCollectorSpec.scala   |   9 +-
 .../experiments/yarn/glue/NMClient.scala        |   2 +-
 .../experiments/yarn/glue/RMClient.scala        |   2 +-
 external/hadoopfs/README.md                     |   2 +-
 .../hadoop/HadoopCheckpointStore.scala          |   6 +-
 .../hadoop/HadoopCheckpointStoreFactory.scala   |   2 -
 .../lib/HadoopCheckpointStoreReader.scala       |   8 +-
 .../lib/HadoopCheckpointStoreWriter.scala       |   4 +-
 .../lib/rotation/FileSizeRotationSpec.scala     |   4 +-
 .../gearpump/streaming/kafka/dsl/KafkaDSL.scala |   1 -
 .../kafka/lib/source/AbstractKafkaSource.scala  |   5 +-
 .../streaming/kafka/lib/store/KafkaStore.scala  |  10 +-
 .../streaming/kafka/KafkaStoreSpec.scala        |  16 +-
 project/BuildExamples.scala                     |  17 +-
 project/BuildExperiments.scala                  |   2 +-
 project/BuildGearpump.scala                     |  56 +++---
 project/Dependencies.scala                      |  11 +-
 .../gearpump/services/MasterService.scala       |   2 +-
 .../gearpump/services/StaticService.scala       |   1 -
 .../services/AppMasterServiceSpec.scala         |   4 +-
 .../gearpump/services/MasterServiceSpec.scala   |   4 +-
 .../gearpump/streaming/ClusterMessage.scala     |   6 +-
 .../gearpump/streaming/StreamApplication.scala  |  13 +-
 .../streaming/appmaster/AppMaster.scala         |   6 +-
 .../streaming/appmaster/ClockService.scala      |  54 +++---
 .../streaming/appmaster/JarScheduler.scala      |   6 +-
 .../appmaster/StreamAppMasterSummary.scala      |   8 +-
 .../streaming/appmaster/TaskManager.scala       |   6 +-
 .../dsl/api/functions/FilterFunction.scala      |   2 -
 .../dsl/api/functions/FoldFunction.scala        |   2 -
 .../dsl/api/functions/MapFunction.scala         |   2 -
 .../api/functions/SerializableFunction.scala    |  32 ++++
 .../dsl/javaapi/functions/FlatMapFunction.scala |   2 +-
 .../apache/gearpump/streaming/dsl/package.scala |  48 ++++++
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 171 ++++++++++++-------
 .../gearpump/streaming/dsl/plan/Planner.scala   |  20 ++-
 .../streaming/dsl/scalaapi/Stream.scala         |   1 +
 .../streaming/dsl/scalaapi/StreamApp.scala      |  25 ++-
 .../scalaapi/functions/FlatMapFunction.scala    |   2 +-
 .../functions/SerializableFunction.scala        |  32 ----
 .../dsl/window/api/AccumulationMode.scala       |  10 ++
 .../streaming/dsl/window/api/Trigger.scala      |   9 +
 .../dsl/window/api/WindowFunction.scala         |  30 +++-
 .../streaming/dsl/window/api/Windows.scala      |  18 +-
 .../dsl/window/impl/ReduceFnRunner.scala        |  29 ----
 .../streaming/dsl/window/impl/Window.scala      |  12 +-
 .../dsl/window/impl/WindowRunner.scala          |  25 ++-
 .../streaming/metrics/ProcessorAggregator.scala |  12 +-
 .../gearpump/streaming/sink/DataSinkTask.scala  |   4 +
 .../gearpump/streaming/source/Watermark.scala   |  11 +-
 .../streaming/state/api/MonoidState.scala       |   6 +-
 .../streaming/state/api/PersistentState.scala   |   8 +-
 .../streaming/state/api/PersistentTask.scala    |   7 +-
 .../state/impl/CheckpointManager.scala          |  14 +-
 .../state/impl/InMemoryCheckpointStore.scala    |  12 +-
 .../streaming/state/impl/NonWindowState.scala   |   6 +-
 .../gearpump/streaming/state/impl/Window.scala  |  10 +-
 .../streaming/state/impl/WindowState.scala      |  16 +-
 .../streaming/task/SerializedMessage.scala      |   4 +-
 .../gearpump/streaming/task/Subscription.scala  |  13 +-
 .../apache/gearpump/streaming/task/Task.scala   |   5 +-
 .../gearpump/streaming/task/TaskActor.scala     |   9 +-
 .../streaming/task/TaskControlMessage.scala     |  14 +-
 .../gearpump/streaming/task/TaskWrapper.scala   |   7 +-
 .../transaction/api/CheckpointStore.scala       |   8 +-
 .../transaction/api/TimeStampFilter.scala       |   7 +-
 .../gearpump/streaming/StreamingTestUtil.scala  |   1 -
 .../streaming/appmaster/AppMasterSpec.scala     |   4 +-
 .../streaming/appmaster/DagManagerSpec.scala    |   2 -
 .../streaming/appmaster/TaskManagerSpec.scala   |   4 +-
 .../streaming/dsl/task/GroupByTaskSpec.scala    |   2 +-
 .../streaming/dsl/task/TransformTaskSpec.scala  |   1 -
 .../state/impl/CheckpointManagerSpec.scala      |   8 +-
 .../state/impl/NonWindowStateSpec.scala         |   8 +-
 .../streaming/state/impl/WindowSpec.scala       |   6 +-
 .../streaming/state/impl/WindowStateSpec.scala  |  15 +-
 .../streaming/task/SubscriptionSpec.scala       |   6 +-
 120 files changed, 728 insertions(+), 530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 95c1427..8148c32 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,6 +1,6 @@
 language:
 - scala
-sudo: false
+sudo: required
 jdk:
 - oraclejdk8
 addons:

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala 
b/core/src/main/scala/org/apache/gearpump/Message.scala
index 4dc5c09..7051396 100644
--- a/core/src/main/scala/org/apache/gearpump/Message.scala
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -20,6 +20,8 @@ package org.apache.gearpump
 
 import java.time.Instant
 
+import org.apache.gearpump.Time.MilliSeconds
+
 trait Message {
 
   val value: Any
@@ -35,7 +37,7 @@ trait Message {
  *
  * @param value Accept any type except Null, Nothing and Unit
  */
-case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message 
{
+case class DefaultMessage(value: Any, timeInMillis: MilliSeconds) extends 
Message {
 
   /**
    * @param value Accept any type except Null, Nothing and Unit
@@ -74,7 +76,7 @@ object Message {
    * @param value Accept any type except Null, Nothing and Unit
    * @param timestamp timestamp must be smaller than Long.MaxValue
    */
-  def apply(value: Any, timestamp: TimeStamp): Message = {
+  def apply(value: Any, timestamp: MilliSeconds): Message = {
     DefaultMessage(value, timestamp)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Time.scala 
b/core/src/main/scala/org/apache/gearpump/Time.scala
new file mode 100644
index 0000000..054becf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/Time.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump
+
+/**
+ * Types and constants of time in gearpump
+ */
+object Time {
+  type MilliSeconds = Long
+
+  // maximum valid time that won't overflow when being converted to 
milli-seconds
+  // Long.MaxValue is reserved for unreachable time
+  val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
+
+  // minimum valid time won't overflow when being converted to milli-seconds
+  val MIN_TIME_MILLIS: Long = Long.MinValue
+
+  val UNREACHABLE: Long = Long.MaxValue
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index e8956ac..8a067b5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.worker.{WorkerId, 
WorkerSummary}
 import scala.util.Try
 import akka.actor.ActorRef
 import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.appmaster.WorkerInfo
 import org.apache.gearpump.cluster.master.MasterSummary
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
@@ -142,7 +142,7 @@ object MasterToClient {
 
   case class MasterConfig(config: Config)
 
-  case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
+  case class HistoryMetricsItem(time: MilliSeconds, value: MetricType)
 
   /**
    * History metrics returned from master, worker, or app master.
@@ -157,7 +157,7 @@ object MasterToClient {
   case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
 
   /** Return the last error of this streaming application job */
-  case class LastFailure(time: TimeStamp, error: String)
+  case class LastFailure(time: MilliSeconds, error: String)
 
   sealed trait ApplicationResult
 
@@ -208,8 +208,8 @@ object AppMasterToMaster {
     def appName: String
     def actorPath: String
     def status: ApplicationStatus
-    def startTime: TimeStamp
-    def uptime: TimeStamp
+    def startTime: MilliSeconds
+    def uptime: MilliSeconds
     def user: String
   }
 
@@ -220,8 +220,8 @@ object AppMasterToMaster {
       appName: String = null,
       actorPath: String = null,
       status: ApplicationStatus = ApplicationStatus.ACTIVE,
-      startTime: TimeStamp = 0L,
-      uptime: TimeStamp = 0L,
+      startTime: MilliSeconds = 0L,
+      uptime: MilliSeconds = 0L,
       user: String = null)
     extends AppMasterSummary
 
@@ -244,7 +244,7 @@ object AppMasterToMaster {
    * Denotes the application state change of an app.
    */
   case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
-      timeStamp: TimeStamp, error: Throwable)
+      timeStamp: MilliSeconds, error: Throwable)
 }
 
 object MasterToAppMaster {
@@ -263,8 +263,8 @@ object MasterToAppMaster {
 
   sealed trait StreamingType
   case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: 
String = null,
-      appMasterPath: String = null, workerPath: String = null, submissionTime: 
TimeStamp = 0,
-      startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null)
+      appMasterPath: String = null, workerPath: String = null, submissionTime: 
MilliSeconds = 0,
+      startTime: MilliSeconds = 0, finishTime: MilliSeconds = 0, user: String 
= null)
 
   case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
index b011a0d..bcaf1f0 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
@@ -19,7 +19,6 @@
 package org.apache.gearpump.cluster.appmaster
 
 import org.apache.gearpump.cluster.{AppDescription, AppJar}
-import akka.routing.MurmurHash._
 
 /**
  * The meta data of an application, which stores the crucial infomation of how 
to launch

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
index d9b73e2..1054628 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.appmaster
 
 import akka.actor.ActorRef
 import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.{ApplicationStatus, 
ApplicationTerminalStatus}
 
 /** Run time info of Application */
@@ -31,9 +31,9 @@ case class ApplicationRuntimeInfo(
     appMaster: ActorRef = ActorRef.noSender,
     worker: ActorRef = ActorRef.noSender,
     user: String = "",
-    submissionTime: TimeStamp = 0,
-    startTime: TimeStamp = 0,
-    finishTime: TimeStamp = 0,
+    submissionTime: MilliSeconds = 0,
+    startTime: MilliSeconds = 0,
+    finishTime: MilliSeconds = 0,
     config: Config = ConfigFactory.empty(),
     status: ApplicationStatus = ApplicationStatus.NONEXIST) {
 
@@ -41,11 +41,11 @@ case class ApplicationRuntimeInfo(
     this.copy(appMaster = appMaster, worker = worker)
   }
 
-  def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = {
+  def onAppMasterActivated(timeStamp: MilliSeconds): ApplicationRuntimeInfo = {
     this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE)
   }
 
-  def onFinalStatus(timeStamp: TimeStamp, finalStatus: 
ApplicationTerminalStatus):
+  def onFinalStatus(timeStamp: MilliSeconds, finalStatus: 
ApplicationTerminalStatus):
     ApplicationRuntimeInfo = {
     this.copy(finishTime = timeStamp, status = finalStatus)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
index cf5842f..c78e06c 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.client
 
 import com.typesafe.config.Config
 import 
org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt
+import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironment
 
 /**
  * The RuntimeEnvironment is the context decides where an application is 
submitted to.
@@ -45,7 +45,7 @@ object RuntimeEnvironment {
   class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, 
null, null)
 
   def get() : RuntimeEnvironment = {
-    Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt)
+    Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironment)
   }
 
   def newClientContext(akkaConf: Config): ClientContext = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
index 8abcd96..3fcd569 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -24,8 +24,6 @@ import scala.concurrent.duration.Duration
 import akka.actor.{ActorRef, ActorSystem, Props}
 import com.typesafe.config.{Config, ConfigValueFactory}
 import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import 
org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext
 import org.apache.gearpump.cluster.master.{Master => MasterActor}
 import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
 import 
org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
 GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala
new file mode 100644
index 0000000..bf3b5a7
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironment.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.embedded
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment}
+import 
org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironment.EmbeddedClientContext
+
+/**
+ * The EmbeddedRuntimeEnvironment is initiated when user trying to launch 
their application
+ * from IDE. It will create an embedded cluster and user's application will 
run in a single
+ * local process.
+ */
+class EmbeddedRuntimeEnvironment extends RuntimeEnvironment {
+  override def newClientContext(akkaConf: Config): ClientContext = {
+    new EmbeddedClientContext(akkaConf)
+  }
+}
+
+object EmbeddedRuntimeEnvironment {
+  class EmbeddedClientContext private(cluster: EmbeddedCluster)
+    extends ClientContext(cluster.config, cluster.system, cluster.master) {
+
+    def this(akkaConf: Config) {
+      this(new EmbeddedCluster(akkaConf))
+    }
+
+    override def close(): Unit = {
+      super.close()
+      cluster.stop()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
index e2f8bad..fa2d429 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
@@ -19,8 +19,7 @@ package org.apache.gearpump.cluster.main
 
 import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
+import org.apache.gearpump.util.AkkaApp
 
 /** Tool to query master info */
 object Info extends AkkaApp with ArgumentsParser {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
index 4f07707..d5a3520 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
@@ -19,8 +19,7 @@
 package org.apache.gearpump.cluster.main
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
+import org.apache.gearpump.util.AkkaApp
 
 /** Tool to kill an App */
 object Kill extends AkkaApp with ArgumentsParser {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
index 42c2081..11b7239 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
@@ -18,8 +18,7 @@
 
 package org.apache.gearpump.cluster.main
 
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
+import org.apache.gearpump.util.AkkaApp
 
 /** Tool to run any main class by providing a jar */
 object MainRunner extends AkkaApp with ArgumentsParser {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
index 03ec899..8c2d7ef 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
@@ -18,8 +18,7 @@
 package org.apache.gearpump.cluster.main
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
+import org.apache.gearpump.util.AkkaApp
 
 // Internal tool to restart an application
 object Replay extends AkkaApp with ArgumentsParser {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index e41a2c5..450d512 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.cluster.master
 
 import akka.actor._
 import akka.pattern.ask
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.gearpump._
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, 
SaveAppDataFailed, _}
 import org.apache.gearpump.cluster.AppMasterToWorker._
 import org.apache.gearpump.cluster.{ApplicationStatus, 
ApplicationTerminalStatus}
@@ -38,7 +38,6 @@ import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, 
Util, _}
 import org.slf4j.Logger
 
 import scala.concurrent.Future
-import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 
 /**
@@ -228,7 +227,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
   }
 
   private def onApplicationStatusChanged(appId: Int, newStatus: 
ApplicationStatus,
-      timeStamp: TimeStamp, error: Throwable): Unit = {
+      timeStamp: MilliSeconds, error: Throwable): Unit = {
     applicationRegistry.get(appId) match {
       case Some(appRuntimeInfo) =>
         if (appRuntimeInfo.status.canTransitTo(newStatus)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
index 2d79558..d791a10 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
@@ -34,7 +34,7 @@ import 
org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownEx
 import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
ApplicationRuntimeInfo, WorkerInfo}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
WorkerInfo}
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
 import org.apache.gearpump.cluster.{AppDescription, AppJar, _}
 import org.apache.gearpump.transport.HostPort

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
index 8da417e..68a12d1 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -22,7 +22,6 @@ import java.lang.management.ManagementFactory
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.jarstore.JarStoreServer
 
-import scala.collection.JavaConverters._
 import scala.collection.immutable
 
 import akka.actor._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
index ec9f1ba..1329127 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -18,7 +18,7 @@
 package org.apache.gearpump.cluster.scheduler
 
 import akka.actor.{Actor, ActorRef}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, 
UpdateResourceSucceed, WorkerRegistered}
 import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
 import org.apache.gearpump.cluster.master.Master.WorkerTerminated
@@ -71,7 +71,7 @@ abstract class Scheduler extends Actor {
 
 object Scheduler {
   case class PendingRequest(
-      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: 
TimeStamp)
+      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: 
MilliSeconds)
 
   case class ApplicationFinished(appId: Int)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala 
b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
index 969da04..d45e102 100644
--- a/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileDirective.scala
@@ -146,7 +146,7 @@ object FileDirective {
           if (p.filename.isDefined) {
             val targetPath = File.createTempFile(s"userfile_${p.name}_",
               s"${p.filename.getOrElse("")}")
-            val writtenFuture = 
p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
+            val writtenFuture = 
p.entity.dataBytes.runWith(FileIO.toPath(targetPath.toPath))
             writtenFuture.map(written =>
               if (written.count > 0) {
                 Map(p.name -> Left(FileInfo(p.filename.get, targetPath, 
written.count)))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala 
b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
index 4ce8f2d..8c1d19a 100644
--- a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
@@ -139,14 +139,14 @@ object FileServer {
       // Download file to local
       val response = Source.single(HttpRequest(uri = 
download)).via(httpClient).runWith(Sink.head)
       val downloaded = response.flatMap { response =>
-        response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
+        response.entity.dataBytes.runWith(FileIO.toPath(saveAs.toPath))
       }
       downloaded.map(written => Unit)
     }
 
     private def entity(file: File)(implicit ec: ExecutionContext): 
Future[RequestEntity] = {
       val entity = HttpEntity(MediaTypes.`application/octet-stream`, 
file.length(),
-        FileIO.fromFile(file, chunkSize = 100000))
+        FileIO.fromPath(file.toPath, chunkSize = 100000))
       val body = Source.single(
         Multipart.FormData.BodyPart(
           "uploadfile",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala 
b/core/src/main/scala/org/apache/gearpump/package.scala
deleted file mode 100644
index 6e20277..0000000
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache
-
-package object gearpump {
-  type TimeStamp = Long
-
-  // maximum time won't overflow when converted to milli-seconds
-  val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
-
-  // minimum time won't overflow when converted to milli-seconds
-  val MIN_TIME_MILLIS: Long = Long.MinValue
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/Graph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala 
b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
index 609b133..f110f5f 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Graph.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
@@ -449,15 +449,15 @@ object Graph {
       new Path(path :+ Right(edge))
     }
 
-    def ~>[Node >: N](node: Node): Path[Node, E] = {
+    def ~>[NodeT >: N](node: NodeT): Path[NodeT, E] = {
       new Path(path :+ Left(node))
     }
 
-    def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
+    def to[NodeT >: N, EdgeT >: E](node: NodeT, edge: EdgeT): Path[NodeT, 
EdgeT] = {
       this ~ edge ~> node
     }
 
-    private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, 
Edge]): Unit = {
+    private[Graph] def updategraph[NodeT >: N, EdgeT >: E](graph: Graph[NodeT, 
EdgeT]): Unit = {
       val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None)
       path.foldLeft(nodeEdgePair) { (pair, either) =>
         val (lastNode, lastEdge) = pair
@@ -465,7 +465,7 @@ object Graph {
           case Left(node) =>
             graph.addVertex(node)
             if (lastNode.isDefined) {
-              graph.addEdge(lastNode.get, 
lastEdge.getOrElse(null.asInstanceOf[Edge]), node)
+              graph.addEdge(lastNode.get, 
lastEdge.getOrElse(null.asInstanceOf[EdgeT]), node)
             }
             (Some(node), None)
           case Right(edge) =>
@@ -481,29 +481,29 @@ object Graph {
 
   implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) {
 
-    override def ~[Edge](edge: Edge): Path[N, Edge] = {
+    override def ~[EdgeT](edge: EdgeT): Path[N, EdgeT] = {
       new Path(List(Left(self), Right(edge)))
     }
 
-    override def ~>[Node >: N](node: Node): Path[Node, E] = {
+    override def ~>[NodeT >: N](node: NodeT): Path[NodeT, E] = {
       new NodeList(List(self, node))
     }
 
-    override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, 
Edge] = {
+    override def to[NodeT >: N, EdgeT >: E](node: NodeT, edge: EdgeT): 
Path[NodeT, EdgeT] = {
       this ~ edge ~> node
     }
   }
 
   class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) {
-    override def ~[Edge](edge: Edge): Path[N, Edge] = {
+    override def ~[EdgeT](edge: EdgeT): Path[N, EdgeT] = {
       new Path(nodes.map(Left(_)) :+ Right(edge))
     }
 
-    override def ~>[Node >: N](node: Node): Path[Node, E] = {
+    override def ~>[NodeT >: N](node: NodeT): Path[NodeT, E] = {
       new NodeList(nodes :+ node)
     }
 
-    override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, 
Edge] = {
+    override def to[NodeT >: N, EdgeT >: E](node: NodeT, edge: EdgeT): 
Path[NodeT, EdgeT] = {
       this ~ edge ~> node
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala 
b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
index ee59678..d45d761 100644
--- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
@@ -25,7 +25,7 @@ import akka.actor.Actor
 import com.typesafe.config.Config
 import org.slf4j.Logger
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, 
ReadOption}
 import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, 
HistoryMetricsItem}
 import org.apache.gearpump.metrics.Metrics._
@@ -217,7 +217,7 @@ object HistoryMetricsService {
       add(inputMetrics, System.currentTimeMillis())
     }
 
-    def add(inputMetrics: MetricType, now: TimeStamp): Unit = {
+    def add(inputMetrics: MetricType, now: MilliSeconds): Unit = {
 
       val metrics = HistoryMetricsItem(now, inputMetrics)
       latest = List(metrics)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala 
b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
index 44cb87f..98850a8 100644
--- a/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
@@ -19,9 +19,7 @@
 package org.apache.gearpump.util
 
 import java.io.File
-import java.net.InetAddress
 import java.util.Properties
-import scala.util.Try
 
 import com.typesafe.config.Config
 import org.apache.log4j.PropertyConfigurator

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/main/scala/org/apache/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala 
b/core/src/main/scala/org/apache/gearpump/util/Util.scala
index fe4b540..2763fd2 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Util.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala
@@ -27,7 +27,7 @@ import scala.util.{Failure, Success, Try}
 import com.typesafe.config.{Config, ConfigFactory}
 
 import org.apache.gearpump.cluster.AppJar
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
+import org.apache.gearpump.jarstore.JarStoreClient
 import org.apache.gearpump.transport.HostPort
 
 object Util {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala 
b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
index 29fcd26..554d9c3 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -35,7 +35,7 @@ import org.apache.gearpump.util.{Constants, LogUtil, Util}
 import org.scalatest._
 
 import scala.concurrent.Future
-import scala.util.{Success, Try}
+import scala.util.Success
 
 class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with 
MasterHarness {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala 
b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
index 5d0c66d..2dcae2f 100644
--- a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
@@ -20,8 +20,6 @@ package org.apache.gearpump.util
 
 import org.scalatest.{FlatSpec, Matchers}
 
-import scala.concurrent.duration._
-
 class RestartPolicySpec extends FlatSpec with Matchers {
 
   "RestartPolicy" should "forbid too many restarts" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
 
b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
index b78bfc2..741a883 100644
--- 
a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ 
b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
 import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, 
RegisterAppMaster, RequestResource}
 import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, 
ResourceAllocated, WorkerList}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
ApplicationRuntimeInfo}
+import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceAllocation, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
TestUtil, UserConfig}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 2aa1bb4..379c7b6 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
 import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
FixedWindows}
-import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.source.{DataSource, Watermark}
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.AkkaApp
 
@@ -45,7 +45,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser 
{
       groupBy(_._1).
       sum.sink(new LoggerSink)
 
-    context.submit(app)
+    context.submit(app).waitUntilFinish()
     context.close()
   }
 
@@ -79,7 +79,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser 
{
 
     override def getWatermark: Instant = {
       if (data.isEmpty) {
-        watermark = watermark.plusMillis(1)
+        watermark = Watermark.MAX
       }
       watermark
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
index 8a869d2..afe7d33 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala
@@ -69,9 +69,10 @@ class GearpumpMaterializerSession(system: ActorSystem, 
topLevel: Module,
           enterScope(copied)
           materializedValues.put(copied, materializeModule(copied, 
currentAttributes))
           exitScope(copied)
-        case composite =>
+        case composite: CompositeModule =>
           materializedValues.put(composite, materializeComposite(composite, 
currentAttributes))
-        case EmptyModule =>
+        case _ =>
+          // ignore other modules
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
index ad2ac61..e87752d 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.akkastream.example
 
 import akka.NotUsed
 import akka.actor.ActorSystem
-import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer}
+import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
 import akka.stream.scaladsl._
 import org.apache.gearpump.akkastream.GearpumpMaterializer
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
index 66414e0..25c7071 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala
@@ -26,7 +26,7 @@ import org.apache.gearpump.akkastream.GearpumpMaterializer
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
 import org.apache.gearpump.util.AkkaApp
 
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Await
 import scala.concurrent.duration._
  
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
index f7919c0..d764331 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -26,7 +26,7 @@ import 
org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy
 import org.apache.gearpump.akkastream.module._
 import akka.stream.impl.StreamLayout.Module
 import akka.stream.impl.fusing.GraphStageModule
-import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, 
SimpleLinearGraphStage, SingleSource}
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, 
SingleSource}
 import akka.stream.impl.{SinkModule, SourceModule}
 import org.apache.gearpump.util.Graph
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
index a74143e..494be45 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala
@@ -18,7 +18,6 @@
 
 package org.apache.gearpump.akkastream.graph
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
 import akka.stream.impl.StreamLayout.Module
 import org.apache.gearpump.util.Graph

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index e2cdbd4..a62b8e3 100644
--- 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -161,7 +161,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
           ProcessorOp(processor.processor, parallelism, updatedConf, "source")
         case sinkBridge: SinkBridgeModule[_, _] =>
           ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
-        case groupBy: GroupByModule[Any, Any] =>
+        case groupBy: GroupByModule[_, _] =>
           GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf)
         case reduce: ReduceModule[_] =>
           reduceOp(reduce.f, conf)
@@ -238,7 +238,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], 
system: ActorSystem) {
         val foldConf = conf.withValue(FoldTask.ZERO, 
fold.zero.asInstanceOf[AnyRef]).
           withValue(FoldTask.AGGREGATOR, fold.f)
         ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
-      case groupBy: GroupBy[Any, Any] =>
+      case groupBy: GroupBy[_, _] =>
         GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf)
       case groupedWithin: GroupedWithin[_] =>
         val diConf = 
conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 987546c..4a438d7 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -37,7 +37,7 @@ import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster, 
UserConfig}
+import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig}
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
 import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback
 import org.apache.gearpump.experiments.storm.util.{GraphBuilder, 
StormConstants, StormUtil, TimeCacheMapWrapper}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
index 5794b1d..9b9bea7 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
@@ -21,10 +21,10 @@ package org.apache.gearpump.experiments.storm.producer
 import java.util.{List => JList}
 
 import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
 
-case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: 
TimeStamp)
+case class PendingMessage(id: Object, messageTime: MilliSeconds, startTime: 
MilliSeconds)
 
 /**
  * this is used by Storm Spout to emit messages
@@ -57,7 +57,7 @@ private[storm] class StormSpoutOutputCollector(
     setPendingOrAck(messageId, curTime, curTime)
   }
 
-  def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
+  def ackPendingMessage(checkpointClock: MilliSeconds): Unit = {
     this.checkpointClock = checkpointClock
     nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
       if (messageTime <= this.checkpointClock) {
@@ -83,7 +83,7 @@ private[storm] class StormSpoutOutputCollector(
     nextPendingMessage = None
   }
 
-  private def setPendingOrAck(messageId: Object, startTime: TimeStamp, 
messageTime: TimeStamp)
+  private def setPendingOrAck(messageId: Object, startTime: MilliSeconds, 
messageTime: MilliSeconds)
     : Unit = {
     if (ackEnabled) {
       val newPendingMessage = PendingMessage(messageId, messageTime, startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index 248ca44..6aa5dc9 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -43,7 +43,8 @@ import 
org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUt
 import org.apache.gearpump.streaming.DAG
 import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId}
 import org.apache.gearpump.util.{Constants, LogUtil}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.slf4j.Logger
 
 import scala.collection.JavaConverters._
@@ -149,7 +150,7 @@ object GearpumpStormComponent {
       }
     }
 
-    def checkpoint(clock: TimeStamp): Unit = {
+    def checkpoint(clock: MilliSeconds): Unit = {
       collector.ackPendingMessage(clock)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
index eb61acb..9f2fa1f 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -23,7 +23,7 @@ import java.util.{List => JList}
 import backtype.storm.task.GeneralTopologyContext
 import backtype.storm.tuple.{Tuple, TupleImpl}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * this carries Storm tuple values in the Gearpump world
@@ -42,7 +42,7 @@ private[storm] class GearpumpTuple(
    * @param topologyContext topology context used for all tasks
    * @return a Tuple
    */
-  def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): 
Tuple = {
+  def toTuple(topologyContext: GeneralTopologyContext, timestamp: 
MilliSeconds): Tuple = {
     TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, 
timestamp)
   }
 
@@ -64,6 +64,6 @@ private[storm] class GearpumpTuple(
 }
 
 case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: 
JList[AnyRef],
-    sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
+    sourceTaskId: Integer, sourceStreamId: String, timestamp: MilliSeconds)
   extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
index fd023a9..a95725e 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -28,7 +28,8 @@ import backtype.storm.task.TopologyContext
 import backtype.storm.tuple.Fields
 import backtype.storm.utils.Utils
 import org.slf4j.Logger
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.experiments.storm.util.StormUtil._
 import org.apache.gearpump.streaming.ProcessorId
@@ -56,7 +57,7 @@ object StormOutputCollector {
         streamGroupers, componentToProcessorId, values)
     }
     new StormOutputCollector(stormTaskId, taskToComponent, targets, 
getTargetPartitionsFn,
-      taskContext, MIN_TIME_MILLIS)
+      taskContext, Time.MIN_TIME_MILLIS)
   }
 
   /**
@@ -164,7 +165,7 @@ class StormOutputCollector(
     targets: JMap[String, JMap[String, Grouping]],
     getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, 
Array[Int]], JList[Integer]),
     val taskContext: TaskContext,
-    private var timestamp: TimeStamp) {
+    private var timestamp: MilliSeconds) {
   import org.apache.gearpump.experiments.storm.util.StormOutputCollector._
 
   /**
@@ -213,7 +214,7 @@ class StormOutputCollector(
   /**
    * set timestamp from each incoming Message if not attached.
    */
-  def setTimestamp(timestamp: TimeStamp): Unit = {
+  def setTimestamp(timestamp: MilliSeconds): Unit = {
     this.timestamp = timestamp
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
index 430b1c0..2fe124d 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.experiments.storm.processor
 
-import java.util.{List => JList}
-
 import backtype.storm.tuple.Tuple
 import backtype.storm.utils.Utils
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
index 49afe05..8faf7d2 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.experiments.storm.producer
 
-import java.util.{List => JList}
-
 import backtype.storm.spout.ISpout
 import backtype.storm.utils.Utils
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index 0891070..50204ca 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -24,12 +24,13 @@ import akka.actor.ActorRef
 import backtype.storm.spout.{ISpout, SpoutOutputCollector}
 import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, 
TopologyContext}
 import backtype.storm.tuple.Tuple
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
 import 
org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt,
 GearpumpSpout}
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
 import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
 import org.apache.gearpump.streaming.{DAG, MockUtil}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
 import org.mockito.Matchers.{anyObject, eq => mockitoEq}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -75,7 +76,7 @@ class GearpumpStormComponentSpec
   property("GearpumpBolt lifecycle") {
     val timestampGen = Gen.chooseNum[Long](0L, 1000L)
     val freqGen = Gen.chooseNum[Int](1, 100)
-    forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) =>
+    forAll(timestampGen, freqGen) { (timestamp: MilliSeconds, freq: Int) =>
       val config = mock[JMap[AnyRef, AnyRef]]
       val bolt = mock[IBolt]
       val taskContext = MockUtil.mockTaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
index f12e54f..dacbdfd 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -21,7 +21,7 @@ import java.util.{List => JList}
 
 import backtype.storm.task.GeneralTopologyContext
 import backtype.storm.tuple.Fields
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -40,7 +40,7 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks 
with Matchers with
     } yield new GearpumpTuple(values, new Integer(sourceTaskId), 
sourceStreamId, null)
 
     forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
-      (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: 
TimeStamp) =>
+      (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: 
MilliSeconds) =>
         val topologyContext = mock[GeneralTopologyContext]
         val fields = new 
Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
         
when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index 05627c9..7fab2cc 100644
--- 
a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ 
b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -27,7 +27,8 @@ import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.streaming.MockUtil
 
@@ -41,7 +42,7 @@ class StormOutputCollectorSpec
 
   property("StormOutputCollector emits tuple values into a stream") {
     forAll(timestampGen, streamIdGen, valuesGen) {
-      (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
+      (timestamp: MilliSeconds, streamId: String, values: JList[AnyRef]) =>
         val targets = mock[JMap[String, JMap[String, Grouping]]]
         val taskToComponent = mock[JMap[Integer, String]]
         val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
@@ -52,7 +53,7 @@ class StormOutputCollectorSpec
           targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
         val stormOutputCollector = new StormOutputCollector(stormTaskId, 
taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
+          targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         stormOutputCollector.emit(streamId, values) shouldBe 
StormOutputCollector.EMPTY_LIST
@@ -85,7 +86,7 @@ class StormOutputCollectorSpec
           targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
         val stormOutputCollector = new StormOutputCollector(stormTaskId, 
taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
+          targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         verify(taskContext, times(0)).output(anyObject[Message])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
index 59f3832..810b557 100644
--- 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
+++ 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
@@ -25,7 +25,7 @@ import com.typesafe.config.Config
 import 
org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
 import org.apache.gearpump.experiments.yarn.glue.Records._
 import org.apache.gearpump.util.LogUtil
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, 
ApplicationReport => YarnApplicationReport, Container => YarnContainer, 
ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId 
=> YarnNodeId, Resource => YarnResource}
+import org.apache.hadoop.yarn.api.records.{ContainerId => YarnContainerId, 
ContainerStatus => YarnContainerStatus}
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
index 629e233..0625b2d 100644
--- 
a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
+++ 
b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorRef
 import 
org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered,
 ContainersAllocated, ContainersCompleted, ResourceManagerException, 
ShutdownApplication}
 import org.apache.gearpump.experiments.yarn.glue.Records._
 import org.apache.gearpump.util.LogUtil
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, 
ApplicationReport => YarnApplicationReport, Container => YarnContainer, 
ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, 
FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => 
YarnResource}
+import org.apache.hadoop.yarn.api.records.{Container => YarnContainer, 
ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, 
FinalApplicationStatus, NodeReport, Priority}
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.slf4j.Logger

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/README.md
----------------------------------------------------------------------
diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md
index 7a9aeef..b02c378 100644
--- a/external/hadoopfs/README.md
+++ b/external/hadoopfs/README.md
@@ -7,7 +7,7 @@ Gearpump components for interacting with HDFS file systems.
 1. File Rotation interface
 ```scala
 trait Rotation extends Serializable {
-  def mark(timestamp: TimeStamp, offset: Long): Unit
+  def mark(timestamp: MilliSeconds, offset: Long): Unit
   def shouldRotate: Boolean
   def rotate: Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
index e26a2ee..5f3ca74 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.slf4j.Logger
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation
 import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, 
HadoopCheckpointStoreWriter}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
@@ -72,7 +72,7 @@ class HadoopCheckpointStore(
    *     b. closes current writer and reset
    *     c. rotation rotates
    */
-  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit 
= {
     curTime = timestamp
     if (curWriter.isEmpty) {
       curStartTime = curTime
@@ -110,7 +110,7 @@ class HadoopCheckpointStore(
    *   5. looks for the checkpoint in the found store
    *   }}}
    */
-  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+  override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
     var checkpoint: Option[Array[Byte]] = None
 
     if (fs.exists(dir)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
index acc2438..4068413 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
@@ -23,10 +23,8 @@ import java.io.{ObjectInputStream, ObjectOutputStream}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil
 import org.apache.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, 
Rotation}
-import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory}
 
 object HadoopCheckpointStoreFactory {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
index 082e963..cce4b5d 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
@@ -23,15 +23,15 @@ import java.io.EOFException
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class HadoopCheckpointStoreReader(
     path: Path,
     hadoopConfig: Configuration)
-  extends Iterator[(TimeStamp, Array[Byte])] {
+  extends Iterator[(MilliSeconds, Array[Byte])] {
 
   private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
-  private var nextTimeStamp: Option[TimeStamp] = None
+  private var nextTimeStamp: Option[MilliSeconds] = None
   private var nextData: Option[Array[Byte]] = None
 
   override def hasNext: Boolean = {
@@ -56,7 +56,7 @@ class HadoopCheckpointStoreReader(
     }
   }
 
-  override def next(): (TimeStamp, Array[Byte]) = {
+  override def next(): (MilliSeconds, Array[Byte]) = {
     val timeAndData = for {
       time <- nextTimeStamp
       data <- nextData

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
index 11c12c4..ce7154a 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
@@ -21,12 +21,12 @@ package org.apache.gearpump.streaming.hadoop.lib
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
   private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
 
-  def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
+  def write(timestamp: MilliSeconds, data: Array[Byte]): Long = {
     stream.writeLong(timestamp)
     stream.writeInt(data.length)
     stream.write(data)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
 
b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
index a469956..8d0170e 100644
--- 
a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ 
b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
 
@@ -31,7 +31,7 @@ class FileSizeRotationSpec extends PropSpec with 
PropertyChecks with Matchers {
   val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
 
   property("FileSize rotation rotates on file size") {
-    forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) 
=>
+    forAll(timestampGen, fileSizeGen) { (timestamp: MilliSeconds, fileSize: 
Long) =>
       val rotation = new FileSizeRotation(fileSize)
       rotation.shouldRotate shouldBe false
       rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
index 996ae0b..391cd42 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.streaming.kafka.dsl
 import java.util.Properties
 
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl
 import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp}
 import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index d5a8729..6633bf4 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -36,7 +36,8 @@ import 
org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.streaming.transaction.api._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.slf4j.Logger
 
 object AbstractKafkaSource {
@@ -147,7 +148,7 @@ abstract class AbstractKafkaSource(
     }
   }
 
-  private def maybeRecover(startTime: TimeStamp): Unit = {
+  private def maybeRecover(startTime: MilliSeconds): Unit = {
     checkpointStores.foreach { case (tp, store) =>
       for {
         bytes <- store.recover(startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
index e2450f4..dbbd0ea 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import com.twitter.bijection.Injection
 import kafka.api.OffsetRequest
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
@@ -82,9 +82,9 @@ class KafkaStore private[kafka](
   extends CheckpointStore {
   import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._
 
-  private var maxTime: TimeStamp = 0L
+  private var maxTime: MilliSeconds = 0L
 
-  override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  override def persist(time: MilliSeconds, checkpoint: Array[Byte]): Unit = {
     // make sure checkpointed timestamp is monotonically increasing
     // hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3)
     if (time > maxTime) {
@@ -98,14 +98,14 @@ class KafkaStore private[kafka](
     LOG.debug("KafkaStore persisted state ({}, {})", key, value)
   }
 
-  override def recover(time: TimeStamp): Option[Array[Byte]] = {
+  override def recover(time: MilliSeconds): Option[Array[Byte]] = {
     var checkpoint: Option[Array[Byte]] = None
     optConsumer.foreach { consumer =>
       while (consumer.hasNext && checkpoint.isEmpty) {
         val kafkaMsg = consumer.next()
         checkpoint = for {
           k <- kafkaMsg.key
-          t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption
+          t <- Injection.invert[MilliSeconds, Array[Byte]](k).toOption
           c = kafkaMsg.msg if t >= time
         } yield c
       }

Reply via email to