Repository: incubator-gearpump Updated Branches: refs/heads/master c27cd808c -> ce3b82251
[GEARPUMP-268] Accept java.time.Instant in Message Author: manuzhang <[email protected]> Closes #142 from manuzhang/instant. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ce3b8225 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ce3b8225 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ce3b8225 Branch: refs/heads/master Commit: ce3b822513ef67d485d41fd5817e0cbeca981e03 Parents: c27cd80 Author: manuzhang <[email protected]> Authored: Thu Feb 9 20:26:43 2017 +0800 Committer: manuzhang <[email protected]> Committed: Thu Feb 9 20:26:59 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/Message.scala | 47 ++++++++++++++++++-- .../cluster/client/RunningApplication.scala | 2 +- .../state/processor/CountProcessor.scala | 2 +- .../processor/NumberGeneratorProcessor.scala | 2 +- .../processor/WindowAverageProcessor.scala | 2 +- .../examples/wordcountjava/dsl/WordCount.java | 2 +- .../streaming/examples/wordcount/Split.scala | 4 +- .../wordcount/dsl/WindowedWordCount.scala | 14 +++--- .../akkastream/task/DelayInitialTask.scala | 2 +- .../akkastream/task/DropWithinTask.scala | 2 +- .../akkastream/task/SingleSourceTask.scala | 6 --- .../akkastream/task/SourceBridgeTask.scala | 2 +- .../akkastream/task/StatefulMapConcatTask.scala | 2 +- .../akkastream/task/TakeWithinTask.scala | 2 +- .../akkastream/task/TickSourceTask.scala | 2 +- .../storm/topology/GearpumpStormComponent.scala | 2 +- .../storm/util/StormOutputCollector.scala | 8 ++-- .../hadoop/HadoopCheckpointStore.scala | 5 ++- .../lib/format/DefaultSequenceFormatter.scala | 2 +- .../hadoop/lib/rotation/FileSizeRotation.scala | 4 +- .../hadoop/lib/rotation/Rotation.scala | 4 +- .../lib/rotation/FileSizeRotationSpec.scala | 7 +-- .../kafka/lib/source/AbstractKafkaSource.scala | 5 ++- .../lib/source/DefaultKafkaMessageDecoder.scala | 2 +- .../source/DefaultKafkaMessageDecoderSpec.scala | 6 +-- .../streaming/dsl/scalaapi/StreamApp.scala | 2 +- .../streaming/dsl/task/TransformTask.scala | 4 +- .../streaming/dsl/window/impl/Window.scala | 2 +- .../dsl/window/impl/WindowRunner.scala | 2 +- .../gearpump/streaming/source/Watermark.scala | 2 +- .../streaming/state/api/PersistentTask.scala | 2 +- .../streaming/task/ExpressTransport.scala | 2 +- .../gearpump/streaming/task/Subscription.scala | 7 +-- .../streaming/task/SubscriptionSpec.scala | 8 ++-- 34 files changed, 104 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/core/src/main/scala/org/apache/gearpump/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala index 871ebe1..9965565 100644 --- a/core/src/main/scala/org/apache/gearpump/Message.scala +++ b/core/src/main/scala/org/apache/gearpump/Message.scala @@ -18,15 +18,56 @@ package org.apache.gearpump +import java.time.Instant + /** - * Each message contains a immutable timestamp. + * Each message contains an immutable timestamp. * * For example, if you take a picture, the time you take the picture is the * message's timestamp. + * * @param msg Accept any type except Null, Nothing and Unit */ -case class Message(msg: Any, timestamp: TimeStamp = Message.noTimeStamp) +case class Message(msg: Any, timeInMillis: TimeStamp) { + + /** + * @param msg Accept any type except Null, Nothing and Unit + * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue) + */ + def this(msg: Any, timestamp: Instant) = { + this(msg, timestamp.toEpochMilli) + } + + /** + * Instant.EPOCH is used for default timestamp + * + * @param msg Accept any type except Null, Nothing and Uni + */ + def this(msg: Any) = { + this(msg, Instant.EPOCH) + } + + def timestamp: Instant = { + Instant.ofEpochMilli(timeInMillis) + } +} object Message { - val noTimeStamp: TimeStamp = 0L + + /** + * Instant.EPOCH is used for default timestamp + * + * @param msg Accept any type except Null, Nothing and Uni + */ + def apply(msg: Any): Message = { + new Message(msg) + } + + /** + * @param msg Accept any type except Null, Nothing and Unit + * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue) + */ + def apply(msg: Any, timestamp: Instant): Message = { + new Message(msg, timestamp) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala index 973e1e8..1c6c959 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.cluster.client -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.ActorRef import akka.pattern.ask import akka.util.Timeout import org.apache.gearpump.cluster.ClientToMaster.{RegisterAppResultListener, ResolveAppId, ShutdownApplication} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala index 2d31eeb..4efc6e1 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala @@ -37,7 +37,7 @@ class CountProcessor(taskContext: TaskContext, conf: UserConfig) } override def processMessage(state: PersistentState[Int], message: Message): Unit = { - state.update(message.timestamp, 1) + state.update(message.timestamp.toEpochMilli, 1) state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), message.timestamp))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala index e6030d6..eb927c6 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala @@ -36,7 +36,7 @@ class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig) } override def onNext(msg: Message): Unit = { - output(Message(num + "", num)) + output(Message(num + "", Instant.ofEpochMilli(num))) num += 1 import scala.concurrent.duration._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala index eea2504..8ddbedd 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala @@ -49,6 +49,6 @@ class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig) override def processMessage(state: PersistentState[AveragedValue], message: Message): Unit = { val value = AveragedValue(message.msg.asInstanceOf[String].toLong) - state.update(message.timestamp, value) + state.update(message.timestamp.toEpochMilli, value) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index 2942861..7d8400d 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -79,7 +79,7 @@ public class WordCount { @Override public Message read() { - return Message.apply(str, Instant.now().toEpochMilli()); + return Message.apply(str, Instant.now()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala index c07e124..4bb28cd 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala @@ -43,10 +43,10 @@ class Split extends DataSource { if (item < result.size - 1) { item += 1 - Message(result(item), System.currentTimeMillis()) + Message(result(item), Instant.now()) } else { item = 0 - Message(result(item), System.currentTimeMillis()) + Message(result(item), Instant.now()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index e1aac4c..2aa1bb4 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -52,12 +52,12 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { private class TimedDataSource extends DataSource { private var data = List( - Message("foo", 1L), - Message("bar", 2L), - Message("foo", 3L), - Message("foo", 5L), - Message("bar", 7L), - Message("bar", 8L) + Message("foo", Instant.ofEpochMilli(1L)), + Message("bar", Instant.ofEpochMilli(2L)), + Message("foo", Instant.ofEpochMilli(3L)), + Message("foo", Instant.ofEpochMilli(5L)), + Message("bar", Instant.ofEpochMilli(7L)), + Message("bar", Instant.ofEpochMilli(8L)) ) private var watermark: Instant = Instant.ofEpochMilli(0) @@ -66,7 +66,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { if (data.nonEmpty) { val msg = data.head data = data.tail - watermark = Instant.ofEpochMilli(msg.timestamp) + watermark = msg.timestamp msg } else { null http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala index 7c335dc..ae91d1f 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala @@ -39,7 +39,7 @@ class DelayInitialTask[T](context: TaskContext, userConf : UserConfig) override def onStart(startTime: Instant): Unit = { context.scheduleOnce(delayInitial)( - self ! Message(DelayInitialTime, System.currentTimeMillis()) + self ! Message(DelayInitialTime, Instant.now()) ) } override def onNext(msg : Message) : Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala index 0c54829..4c19de5 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala @@ -38,7 +38,7 @@ class DropWithinTask[T](context: TaskContext, userConf : UserConfig) override def onStart(startTime: Instant): Unit = { context.scheduleOnce(timeout)( - self ! Message(DropWithinTimeout, System.currentTimeMillis()) + self ! Message(DropWithinTimeout, Instant.now()) ) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala index 458bb4e..5bea47e 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala @@ -18,16 +18,10 @@ package org.apache.gearpump.akkastream.task -import java.time.Instant -import java.util.Date -import java.util.concurrent.TimeUnit - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.task.TaskContext -import scala.concurrent.duration.FiniteDuration - class SingleSourceTask[T](context: TaskContext, userConf : UserConfig) extends GraphTask(context, userConf) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala index 054b483..5b64a52 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala @@ -64,7 +64,7 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) LOG.error("the stream has error", ex) case AkkaStreamMessage(msg) => LOG.info("we have received message from akka stream source: " + msg) - taskContext.output(Message(msg, System.currentTimeMillis())) + taskContext.output(Message(msg, Instant.now())) case Complete(description) => LOG.info("the stream is completed: " + description) case msg => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala index a0674bc..b776f2c 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala @@ -40,7 +40,7 @@ class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig val iterator = out.iterator while(iterator.hasNext) { val nextValue = iterator.next - context.output(Message(nextValue, System.currentTimeMillis())) + context.output(Message(nextValue, Instant.now())) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala index 9559d8f..7aa4e8e 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala @@ -38,7 +38,7 @@ class TakeWithinTask[T](context: TaskContext, userConf : UserConfig) override def onStart(startTime: Instant): Unit = { context.scheduleOnce(timeout)( - self ! Message(DropWithinTimeout, System.currentTimeMillis()) + self ! Message(DropWithinTimeout, Instant.now()) ) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala index d99d2db..a10e138 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala @@ -40,7 +40,7 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig) override def onStart(startTime: Instant): Unit = { context.schedule(initialDelay, interval)( - self ! Message(tick, System.currentTimeMillis()) + self ! Message(tick, Instant.now()) ) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index a8e061c..4536277 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -217,7 +217,7 @@ object GearpumpStormComponent { } override def next(message: Message): Unit = { - val timestamp = message.timestamp + val timestamp = message.timestamp.toEpochMilli collector.setTimestamp(timestamp) bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, timestamp)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala index f95b840..1b239b5 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -18,16 +18,16 @@ package org.apache.gearpump.experiments.storm.util +import java.time.Instant import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap} -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters._ import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject} import backtype.storm.grouping.CustomStreamGrouping import backtype.storm.task.TopologyContext import backtype.storm.tuple.Fields import backtype.storm.utils.Utils import org.slf4j.Logger - import org.apache.gearpump._ import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.experiments.storm.util.StormUtil._ @@ -182,7 +182,7 @@ class StormOutputCollector( if (targets.containsKey(streamId)) { val (targetPartitions, targetStormTaskIds) = getTargetPartitionsFn(streamId, values) val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - taskContext.output(Message(tuple, timestamp)) + taskContext.output(Message(tuple, Instant.ofEpochMilli(timestamp))) targetStormTaskIds } else { EMPTY_LIST @@ -206,7 +206,7 @@ class StormOutputCollector( val partition = stormTaskIdToGearpump(id).index val targetPartitions = Map(target -> Array(partition)) val tuple = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - taskContext.output(Message(tuple, timestamp)) + taskContext.output(Message(tuple, Instant.ofEpochMilli(timestamp))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala index a18cce6..e26a2ee 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala @@ -18,10 +18,11 @@ package org.apache.gearpump.streaming.hadoop +import java.time.Instant + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.slf4j.Logger - import org.apache.gearpump.TimeStamp import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter} @@ -82,7 +83,7 @@ class HadoopCheckpointStore( curWriter.foreach { w => val offset = w.write(timestamp, checkpoint) - rotation.mark(timestamp, offset) + rotation.mark(Instant.ofEpochMilli(timestamp), offset) } if (rotation.shouldRotate) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala index 318c071..822eb5f 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.io.{LongWritable, Text, Writable} import org.apache.gearpump.Message class DefaultSequenceFormatter extends OutputFormatter { - override def getKey(message: Message): Writable = new LongWritable(message.timestamp) + override def getKey(message: Message): Writable = new LongWritable(message.timestamp.toEpochMilli) override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala index 72be9c3..285b8ef 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala @@ -18,13 +18,13 @@ package org.apache.gearpump.streaming.hadoop.lib.rotation -import org.apache.gearpump.TimeStamp +import java.time.Instant case class FileSizeRotation(maxBytes: Long) extends Rotation { private var bytesWritten = 0L - override def mark(timestamp: TimeStamp, offset: Long): Unit = { + override def mark(timestamp: Instant, offset: Long): Unit = { bytesWritten = offset } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala index cd8c04a..20de3d7 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/Rotation.scala @@ -18,10 +18,10 @@ package org.apache.gearpump.streaming.hadoop.lib.rotation -import org.apache.gearpump.TimeStamp +import java.time.Instant trait Rotation extends Serializable { - def mark(timestamp: TimeStamp, offset: Long): Unit + def mark(timestamp: Instant, offset: Long): Unit def shouldRotate: Boolean def rotate(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala index 4eab3c9..a469956 100644 --- a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala +++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala @@ -18,10 +18,11 @@ package org.apache.gearpump.streaming.hadoop.lib.rotation +import java.time.Instant + import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} - import org.apache.gearpump.TimeStamp class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { @@ -33,9 +34,9 @@ class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers { forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) => val rotation = new FileSizeRotation(fileSize) rotation.shouldRotate shouldBe false - rotation.mark(timestamp, rotation.maxBytes / 2) + rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2) rotation.shouldRotate shouldBe false - rotation.mark(timestamp, rotation.maxBytes) + rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes) rotation.shouldRotate shouldBe true rotation.rotate rotation.shouldRotate shouldBe false http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala index ba49899..a8bda50 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala @@ -130,8 +130,9 @@ abstract class AbstractKafkaSource( msg } - private def checkpointOffsets(tp: TopicAndPartition, time: TimeStamp, offset: Long): Unit = { - checkpointStores.get(tp).foreach(_.persist(time, Injection[Long, Array[Byte]](offset))) + private def checkpointOffsets(tp: TopicAndPartition, time: Instant, offset: Long): Unit = { + checkpointStores.get(tp).foreach(_.persist(time.toEpochMilli, + Injection[Long, Array[Byte]](offset))) } private def maybeSetupCheckpointStores(tps: Array[TopicAndPartition]): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala index 5e13230..2b63967 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala @@ -27,7 +27,7 @@ class DefaultKafkaMessageDecoder extends KafkaMessageDecoder { override def fromBytes(key: Array[Byte], value: Array[Byte]): MessageAndWatermark = { val time = Instant.now() - MessageAndWatermark(Message(value, time.toEpochMilli), time) + MessageAndWatermark(Message(value, time), time) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala index 81b2661..9f29022 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.kafka.lib.source +import java.time.Instant + import com.twitter.bijection.Injection import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks @@ -29,14 +31,12 @@ class DefaultKafkaMessageDecoderSpec extends PropSpec with PropertyChecks with M forAll(Gen.chooseNum[Int](0, 100), Gen.alphaStr) { (k: Int, v: String) => val kbytes = Injection[Int, Array[Byte]](k) val vbytes = Injection[String, Array[Byte]](v) - val timestamp = System.currentTimeMillis() val msgAndWmk = decoder.fromBytes(kbytes, vbytes) val message = msgAndWmk.message val watermark = msgAndWmk.watermark message.msg shouldBe vbytes // processing time as message timestamp and watermark - message.timestamp shouldBe watermark.toEpochMilli - message.timestamp should be >= timestamp + message.timestamp shouldBe watermark } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala index d6eed2e..52972b7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -97,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { override def read(): Message = { if (iterator.hasNext) { - Message(iterator.next(), Instant.now().toEpochMilli) + Message(iterator.next(), Instant.now()) } else { null } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 5febeb6..c4278dd 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -43,10 +43,10 @@ class TransformTask[IN, OUT](operator: Option[FunctionRunner[IN, OUT]], operator match { case Some(op) => op.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg, time)) + taskContext.output(Message(msg, time)) } case None => - taskContext.output(new Message(msg.msg, time)) + taskContext.output(Message(msg.msg, time)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 3e9d8fb..e1d9cee 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -89,7 +89,7 @@ case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T] val group = groupByFn(ele) val windows = window.windowFn(new WindowFunction.Context[T] { override def element: T = ele - override def timestamp: Instant = Instant.ofEpochMilli(message.timestamp) + override def timestamp: Instant = message.timestamp }) windows.map(WindowAndGroup(_, group)).toList } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 7013645..1ada42c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -128,7 +128,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( } def emitResult(result: OUT, time: Instant): Unit = { - taskContext.output(Message(result, time.toEpochMilli)) + taskContext.output(Message(result, time)) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 36099c1..912bb12 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -25,5 +25,5 @@ import org.apache.gearpump.Message * message used by source task to report source watermark. */ case class Watermark(instant: Instant) { - def toMessage: Message = Message("watermark", instant.toEpochMilli) + def toMessage: Message = Message("watermark", instant) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index 5eaed40..f9f5b33 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -79,7 +79,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) } final override def onNext(message: Message): Unit = { - checkpointManager.update(message.timestamp) + checkpointManager.update(message.timeInMillis) .foreach(state.setNextCheckpointTime) processMessage(state, message) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala index 7629cf9..3cc1cd0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala @@ -56,7 +56,7 @@ trait ExpressTransport { msg match { case message: Message => val bytes = serializerPool.get().serialize(message.msg) - serializedMessage = SerializedMessage(message.timestamp, bytes) + serializedMessage = SerializedMessage(message.timeInMillis, bytes) case _ => serializedMessage = msg } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 4193fbf..3e68580 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -103,13 +103,14 @@ class Subscription( var count = 0 // Only sends message whose timestamp matches the lifeTime - if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timestamp)) { + if (partition != Partitioner.UNKNOWN_PARTITION_ID && life.contains(msg.timeInMillis)) { val targetTask = TaskId(processorId, partition) transport.transport(msg, targetTask) - this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timestamp) - this.candidateMinClock(partition) = Math.min(this.candidateMinClock(partition), msg.timestamp) + this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timeInMillis) + this.candidateMinClock(partition) = + Math.min(this.candidateMinClock(partition), msg.timeInMillis) incrementMessageCount(partition, 1) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ce3b8225/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index d128ace..3635db9 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -18,12 +18,12 @@ package org.apache.gearpump.streaming.task +import java.time.Instant import java.util.Random import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} @@ -66,13 +66,13 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { it should "send message and handle ack correctly" in { val (subscription, transport) = prepare - val msg1 = new Message("1", timestamp = 70) + val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70)) subscription.sendMessage(msg1) verify(transport, times(1)).transport(msg1, TaskId(1, 1)) assert(subscription.minClock == 70) - val msg2 = new Message("0", timestamp = 50) + val msg2 = new Message("0", timestamp = Instant.ofEpochMilli(50)) subscription.sendMessage(msg2) verify(transport, times(1)).transport(msg2, TaskId(1, 0)) @@ -120,7 +120,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { it should "report minClock as Long.MaxValue when there is no pending message" in { val (subscription, transport) = prepare - val msg1 = new Message("1", timestamp = 70) + val msg1 = new Message("1", timestamp = Instant.ofEpochMilli(70)) subscription.sendMessage(msg1) assert(subscription.minClock == 70) subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
