Repository: incubator-gearpump Updated Branches: refs/heads/master ffb3d8c91 -> 519a19500
[GEARPUMP-280] Set MAX_TIME_MILLIS to Long.MaxValue minus 1 Author: manuzhang <[email protected]> Closes #159 from manuzhang/GEARPUMP-280. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/519a1950 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/519a1950 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/519a1950 Branch: refs/heads/master Commit: 519a195004371d05a7b0a54e1e7b8291038c09f8 Parents: ffb3d8c Author: manuzhang <[email protected]> Authored: Fri Feb 24 12:43:46 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Feb 24 12:43:59 2017 +0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/gearpump/package.scala | 2 +- .../org/apache/gearpump/streaming/StreamApplication.scala | 3 ++- .../scala/org/apache/gearpump/streaming/source/Watermark.scala | 2 -- .../org/apache/gearpump/streaming/task/SubscriptionSpec.scala | 6 +++--- 4 files changed, 6 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/core/src/main/scala/org/apache/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala index 2f74ac4..6e20277 100644 --- a/core/src/main/scala/org/apache/gearpump/package.scala +++ b/core/src/main/scala/org/apache/gearpump/package.scala @@ -22,7 +22,7 @@ package object gearpump { type TimeStamp = Long // maximum time won't overflow when converted to milli-seconds - val MAX_TIME_MILLIS: Long = Long.MaxValue + val MAX_TIME_MILLIS: Long = Long.MaxValue - 1 // minimum time won't overflow when converted to milli-seconds val MIN_TIME_MILLIS: Long = Long.MinValue http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 23350dd..435414b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -113,7 +113,8 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) { } object LifeTime { - val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS) + // MAX_TIME_MILLIS is Long.MaxValue - 1 + val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1) } /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 9c27bde..4371257 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -31,9 +31,7 @@ case class Watermark(instant: Instant) { object Watermark { - // maximum time won't overflow when converted to milli-seconds val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS) - // minimum time won't overflow when converted to milli-seconds val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/519a1950/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 3635db9..9f02cef 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -24,7 +24,7 @@ import java.util.Random import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.Message +import org.apache.gearpump.{MAX_TIME_MILLIS, Message} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask @@ -119,12 +119,12 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { } it should "report minClock as Long.MaxValue when there is no pending message" in { - val (subscription, transport) = prepare + val (subscription, _) = prepare val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70)) subscription.sendMessage(msg1) assert(subscription.minClock == 70) subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session)) - assert(subscription.minClock == Long.MaxValue) + assert(subscription.minClock == MAX_TIME_MILLIS) } private def randomMessage: String = new Random().nextInt.toString
