Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 000e846ab -> 176d82763


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index c6817f5..79bcc2a 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -19,14 +19,13 @@
 package org.apache.gearpump.streaming.task
 
 import org.slf4j.Logger
-
 import com.google.common.primitives.Shorts
 import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, 
Partitioner, UnicastPartitioner}
 import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import org.apache.gearpump.streaming.LifeTime
 import org.apache.gearpump.streaming.task.Subscription._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, 
TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, 
TimeStamp}
 
 /**
  * Manges the output and message clock for single downstream processor
@@ -103,14 +102,16 @@ class Subscription(
 
     var count = 0
     // Only sends message whose timestamp matches the lifeTime
-    if (partition != Partitioner.UNKNOWN_PARTITION_ID && 
life.contains(msg.timeInMillis)) {
+    if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(
+      msg.timestamp.toEpochMilli)) {
 
       val targetTask = TaskId(processorId, partition)
       transport.transport(msg, targetTask)
 
-      this.minClockValue(partition) = Math.min(this.minClockValue(partition), 
msg.timeInMillis)
+      this.minClockValue(partition) = Math.min(this.minClockValue(partition),
+        msg.timestamp.toEpochMilli)
       this.candidateMinClock(partition) =
-        Math.min(this.candidateMinClock(partition), msg.timeInMillis)
+        Math.min(this.candidateMinClock(partition), msg.timestamp.toEpochMilli)
 
       incrementMessageCount(partition, 1)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index 5b174bd..90a8bff 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor.Receive
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
 import org.slf4j.Logger
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 10648b4..8ef45f3 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -35,7 +35,7 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.task.TaskActor._
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, 
TimeStamp}
+import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message, 
TimeStamp}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 1b3f30c..f5f099c 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -26,7 +26,7 @@ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{TimeStamp, Message}
+import org.apache.gearpump.{Message, TimeStamp}
 
 /**
  * This provides TaskContext for user defined tasks

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index bcf96e4..c223a53 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, 
ProcessorDescription, Proce
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.TimeStamp
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index a9b23fe..f5d7c20 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -268,7 +268,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       source.onWatermarkProgress(Watermark.MAX)
       data.foreach { s =>
         verify(taskContext, times(1)).output(MockUtil.argMatch[Message](
-          message => message.msg == s))
+          message => message.value == s))
       }
 
       // Source with transformer
@@ -282,7 +282,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       another.onWatermarkProgress(Watermark.MAX)
       data.foreach { s =>
         verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message](
-          message => message.msg == s))
+          message => message.value == s))
       }
     }
   }
@@ -317,7 +317,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       import scala.collection.JavaConverters._
 
       val values = peopleCaptor.getAllValues.asScala.map(input =>
-        input.msg.asInstanceOf[Option[String]].get)
+        input.value.asInstanceOf[Option[String]].get)
       assert(values.mkString(",") == "1,2,22,3,33,333")
       system.terminate()
       Await.result(system.whenTerminated, Duration.Inf)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 62a3bcb..fb398b8 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -115,10 +115,10 @@ object StreamSpec {
     var query: String = _
 
     override def onNext(msg: Message): Unit = {
-      msg.msg match {
+      msg.value match {
         case Left(wordCount: (String @unchecked, Int @unchecked)) =>
           if (query != null && wordCount._1 == query) {
-            taskContext.output(new Message(wordCount))
+            taskContext.output(Message(wordCount))
           }
 
         case Right(query: String) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index f0bccd7..281d69a 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -64,12 +64,12 @@ class TransformTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
 
         msgs.foreach { msg =>
           runner.foreach(r =>
-            when(r.process(msg.msg)).thenReturn(Some(msg.msg)))
+            when(r.process(msg.value)).thenReturn(Some(msg.value)))
         }
         task.onWatermarkProgress(Watermark.MAX)
 
         msgs.foreach { msg =>
-          verify(taskContext).output(MockitoMatchers.eq(Message(msg.msg, 
Watermark.MAX)))
+          verify(taskContext).output(MockitoMatchers.eq(Message(msg.value, 
Watermark.MAX)))
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 9f02cef..fb0beaa 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -66,13 +66,13 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
 
   it should "send message and handle ack correctly" in {
     val (subscription, transport) = prepare
-    val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
+    val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
     subscription.sendMessage(msg1)
 
     verify(transport, times(1)).transport(msg1, TaskId(1, 1))
     assert(subscription.minClock == 70)
 
-    val msg2 = new Message("0", timestamp = Instant.ofEpochMilli(50))
+    val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
     subscription.sendMessage(msg2)
     verify(transport, times(1)).transport(msg2, TaskId(1, 0))
 
@@ -120,7 +120,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
 
   it should "report minClock as Long.MaxValue when there is no pending 
message" in {
     val (subscription, _) = prepare
-    val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
+    val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
     subscription.sendMessage(msg1)
     assert(subscription.minClock == 70)
     subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))

Reply via email to