Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 23daf0cf9 -> 529799cc4


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index 647ad0a..e461ae8 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -159,9 +159,6 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       // clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0
       appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
 
-      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
-      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-
       // check min clock
       appMaster.tell(GetLatestMinClock, mockTask.ref)
       mockTask.expectMsg(LatestMinClock(0))
@@ -169,9 +166,6 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, 
task(1,1)->0
       appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
 
-      // there is no further upstream, so the upstreamMinClock is Long.MaxValue
-      mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
-
       // check min clock
       appMaster.tell(GetLatestMinClock, mockTask.ref)
       mockTask.expectMsg(LatestMinClock(0))
@@ -238,7 +232,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       for (i <- 1 to 5) {
         val taskId = TaskId(0, 0)
         appMaster.tell(UpdateClock(taskId, i), mockTask.ref)
-        mockTask.expectMsg(UpstreamMinClock(Long.MaxValue))
+
         val cause = s"message loss $i from $taskId"
         appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref)
         // appmaster restarted
@@ -300,9 +294,7 @@ object AppMasterSpec {
 }
 
 class TaskA(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-  override def onNext(msg: Message): Unit = {}
 }
 
 class TaskB(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-  override def onNext(msg: Message): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index e742a2c..d42fe6f 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -58,9 +58,6 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       // task(0,0): clock(101); task(1,0): clock(100)
       clockService ! UpdateClock(TaskId(0, 0), 101)
 
-      // There is no upstream, so pick Long.MaxValue
-      expectMsg(UpstreamMinClock(Long.MaxValue))
-
       // Min clock is updated
       clockService ! GetLatestMinClock
       expectMsg(LatestMinClock(100))
@@ -83,7 +80,6 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       val clockService = system.actorOf(Props(new ClockService(dag, store)))
       val task = TestProbe()
       clockService.tell(UpdateClock(TaskId(0, 0), 200), task.ref)
-      task.expectMsgType[UpstreamMinClock]
 
       val task3 = ProcessorDescription(id = 3, taskClass = 
classOf[TaskActor].getName,
         parallelism = 1)
@@ -122,7 +118,6 @@ class ClockServiceSpec(_system: ActorSystem) extends 
TestKit(_system) with Impli
       store.put(ClockService.START_CLOCK, startClock)
       val clockService = system.actorOf(Props(new ClockService(dag, store)))
       clockService ! UpdateClock(TaskId(0, 0), 200L)
-      expectMsgType[UpstreamMinClock]
       clockService ! UpdateClock(TaskId(1, 0), 200L)
       expectMsgType[UpstreamMinClock]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index bb495a7..54ecde1 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -270,11 +270,9 @@ object TaskManagerSpec {
 
   class Task1(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onNext(msg: Message): Unit = {}
   }
 
   class Task2(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onNext(msg: Message): Unit = {}
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
deleted file mode 100644
index d6fa443..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.source
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump.{Message, TimeStamp}
-
-class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with 
Matchers {
-  property("DefaultTimeStampFilter should filter message against give 
timestamp") {
-    val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-    val messageGen = for {
-      msg <- Gen.alphaStr
-      time <- timestampGen
-    } yield Message(msg, time)
-
-    val filter = new DefaultTimeStampFilter()
-
-    forAll(timestampGen, messageGen) {
-      (predicate: TimeStamp, message: Message) =>
-        if (message.timestamp >= predicate) {
-          filter.filter(message, predicate) shouldBe Some(message)
-        } else {
-          filter.filter(message, predicate) shouldBe None
-        }
-
-        filter.filter(null, predicate) shouldBe None
-    }
-  }
-}

Reply via email to