Repository: incubator-gearpump
Updated Branches:
  refs/heads/master ffb3d8c91 -> 519a19500


[GEARPUMP-280] Set MAX_TIME_MILLIS to Long.MaxValue minus 1

Author: manuzhang <[email protected]>

Closes #159 from manuzhang/GEARPUMP-280.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/519a1950
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/519a1950
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/519a1950

Branch: refs/heads/master
Commit: 519a195004371d05a7b0a54e1e7b8291038c09f8
Parents: ffb3d8c
Author: manuzhang <[email protected]>
Authored: Fri Feb 24 12:43:46 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Feb 24 12:43:59 2017 +0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/gearpump/package.scala          | 2 +-
 .../org/apache/gearpump/streaming/StreamApplication.scala      | 3 ++-
 .../scala/org/apache/gearpump/streaming/source/Watermark.scala | 2 --
 .../org/apache/gearpump/streaming/task/SubscriptionSpec.scala  | 6 +++---
 4 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/core/src/main/scala/org/apache/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala 
b/core/src/main/scala/org/apache/gearpump/package.scala
index 2f74ac4..6e20277 100644
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -22,7 +22,7 @@ package object gearpump {
   type TimeStamp = Long
 
   // maximum time won't overflow when converted to milli-seconds
-  val MAX_TIME_MILLIS: Long = Long.MaxValue
+  val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
 
   // minimum time won't overflow when converted to milli-seconds
   val MIN_TIME_MILLIS: Long = Long.MinValue

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 23350dd..435414b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -113,7 +113,8 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) {
 }
 
 object LifeTime {
-  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS)
+  // MAX_TIME_MILLIS is Long.MaxValue - 1
+  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 9c27bde..4371257 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -31,9 +31,7 @@ case class Watermark(instant: Instant) {
 
 object Watermark {
 
-  // maximum time won't overflow when converted to milli-seconds
   val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
 
-  // minimum time won't overflow when converted to milli-seconds
   val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 3635db9..9f02cef 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.Message
+import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
@@ -119,12 +119,12 @@ class SubscriptionSpec extends FlatSpec with Matchers 
with MockitoSugar {
   }
 
   it should "report minClock as Long.MaxValue when there is no pending 
message" in {
-    val (subscription, transport) = prepare
+    val (subscription, _) = prepare
     val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70))
     subscription.sendMessage(msg1)
     assert(subscription.minClock == 70)
     subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
-    assert(subscription.minClock == Long.MaxValue)
+    assert(subscription.minClock == MAX_TIME_MILLIS)
   }
 
   private def randomMessage: String = new Random().nextInt.toString

Reply via email to