[GEARPUMP-312] Add Message trait and DefaultMessage impl Author: manuzhang <[email protected]>
Closes #183 from manuzhang/message_trait. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/176d8276 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/176d8276 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/176d8276 Branch: refs/heads/master Commit: 176d82763550ff59b65038c264c63d6b11951996 Parents: 000e846 Author: manuzhang <[email protected]> Authored: Thu May 18 14:02:51 2017 +0800 Committer: manuzhang <[email protected]> Committed: Thu May 18 14:03:34 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/Message.scala | 47 ++++++++---- .../streaming/examples/complexdag/Node.scala | 4 +- .../streaming/examples/complexdag/Sink.scala | 2 +- .../streaming/examples/complexdag/Source.scala | 2 +- .../examples/complexdag/NodeSpec.scala | 2 +- .../examples/complexdag/SourceSpec.scala | 3 +- .../examples/fsio/SeqFileStreamProcessor.scala | 2 +- .../fsio/SeqFileStreamProducerSpec.scala | 2 +- .../examples/kafka/wordcount/Split.scala | 4 +- .../examples/kafka/wordcount/Sum.scala | 4 +- .../examples/sol/SOLStreamProcessor.scala | 3 +- .../examples/sol/SOLStreamProducer.scala | 2 +- .../state/processor/CountProcessor.scala | 3 +- .../processor/WindowAverageProcessor.scala | 4 +- .../state/DefaultMessageCountAppSpec.scala | 79 ++++++++++++++++++++ .../examples/state/MessageCountAppSpec.scala | 79 -------------------- .../state/processor/CountProcessorSpec.scala | 1 - .../streaming/examples/wordcountjava/Split.java | 3 +- .../streaming/examples/wordcountjava/Sum.java | 4 +- .../examples/wordcountjava/dsl/WordCount.java | 3 +- .../streaming/examples/wordcount/Sum.scala | 7 +- .../gearpump/akkastream/task/BatchTask.scala | 9 +-- .../gearpump/akkastream/task/ConcatTask.scala | 2 +- .../akkastream/task/DelayInitialTask.scala | 5 +- .../akkastream/task/DropWithinTask.scala | 4 +- .../akkastream/task/FlattenMergeTask.scala | 2 +- .../gearpump/akkastream/task/FoldTask.scala | 6 +- .../akkastream/task/GroupedWithinTask.scala | 2 +- .../akkastream/task/InterleaveTask.scala | 2 +- .../gearpump/akkastream/task/MapAsyncTask.scala | 6 +- .../gearpump/akkastream/task/MergeTask.scala | 2 +- .../akkastream/task/SingleSourceTask.scala | 2 +- .../akkastream/task/SinkBridgeTask.scala | 4 +- .../akkastream/task/SourceBridgeTask.scala | 2 +- .../akkastream/task/StatefulMapConcatTask.scala | 4 +- .../akkastream/task/TakeWithinTask.scala | 4 +- .../gearpump/akkastream/task/ThrottleTask.scala | 5 +- .../akkastream/task/TickSourceTask.scala | 4 +- .../gearpump/akkastream/task/Unzip2Task.scala | 6 +- .../gearpump/akkastream/task/Zip2Task.scala | 4 +- .../experimental/rabbitmq/RMQSink.scala | 2 +- .../org/apache/gearpump/redis/RedisSink.scala | 2 +- .../storm/partitioner/StormPartitioner.scala | 2 +- .../storm/processor/StormProcessor.scala | 2 +- .../storm/topology/GearpumpStormComponent.scala | 4 +- .../partitioner/StormPartitionerSpec.scala | 3 +- .../storm/util/StormOutputCollectorSpec.scala | 13 ++-- .../streaming/hadoop/SequenceFileSink.scala | 1 - .../lib/format/DefaultSequenceFormatter.scala | 3 +- .../hadoop/lib/format/OutputFormatter.scala | 1 - .../gearpump/external/hbase/HBaseSink.scala | 2 +- .../kafka/lib/sink/AbstractKafkaSink.scala | 2 +- .../kafka/lib/source/AbstractKafkaSource.scala | 3 +- .../source/DefaultKafkaMessageDecoderSpec.scala | 4 +- .../dsl/partitioner/GroupByPartitioner.scala | 2 +- .../streaming/dsl/scalaapi/Stream.scala | 2 +- .../streaming/dsl/task/TransformTask.scala | 23 +++--- .../streaming/dsl/window/impl/Window.scala | 2 +- .../dsl/window/impl/WindowRunner.scala | 2 +- .../partitioner/BroadcastPartitioner.scala | 4 +- .../streaming/partitioner/HashPartitioner.scala | 2 +- .../gearpump/streaming/source/Watermark.scala | 3 +- .../streaming/state/api/PersistentTask.scala | 4 +- .../streaming/task/ExpressTransport.scala | 5 +- .../gearpump/streaming/task/Subscription.scala | 11 +-- .../apache/gearpump/streaming/task/Task.scala | 1 - .../gearpump/streaming/task/TaskActor.scala | 2 +- .../gearpump/streaming/task/TaskWrapper.scala | 2 +- .../streaming/appmaster/TaskManagerSpec.scala | 2 +- .../dsl/plan/functions/FunctionRunnerSpec.scala | 6 +- .../streaming/dsl/scalaapi/StreamSpec.scala | 4 +- .../streaming/dsl/task/TransformTaskSpec.scala | 4 +- .../streaming/task/SubscriptionSpec.scala | 6 +- 73 files changed, 233 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/core/src/main/scala/org/apache/gearpump/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala index 9965565..4dc5c09 100644 --- a/core/src/main/scala/org/apache/gearpump/Message.scala +++ b/core/src/main/scala/org/apache/gearpump/Message.scala @@ -20,34 +20,41 @@ package org.apache.gearpump import java.time.Instant +trait Message { + + val value: Any + + val timestamp: Instant +} + /** * Each message contains an immutable timestamp. * * For example, if you take a picture, the time you take the picture is the * message's timestamp. * - * @param msg Accept any type except Null, Nothing and Unit + * @param value Accept any type except Null, Nothing and Unit */ -case class Message(msg: Any, timeInMillis: TimeStamp) { +case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message { /** - * @param msg Accept any type except Null, Nothing and Unit + * @param value Accept any type except Null, Nothing and Unit * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue) */ - def this(msg: Any, timestamp: Instant) = { - this(msg, timestamp.toEpochMilli) + def this(value: Any, timestamp: Instant) = { + this(value, timestamp.toEpochMilli) } /** * Instant.EPOCH is used for default timestamp * - * @param msg Accept any type except Null, Nothing and Uni + * @param value Accept any type except Null, Nothing and Uni */ - def this(msg: Any) = { - this(msg, Instant.EPOCH) + def this(value: Any) = { + this(value, Instant.EPOCH) } - def timestamp: Instant = { + override val timestamp: Instant = { Instant.ofEpochMilli(timeInMillis) } } @@ -57,17 +64,25 @@ object Message { /** * Instant.EPOCH is used for default timestamp * - * @param msg Accept any type except Null, Nothing and Uni + * @param value Accept any type except Null, Nothing and Unit */ - def apply(msg: Any): Message = { - new Message(msg) + def apply(value: Any): Message = { + new DefaultMessage(value) } /** - * @param msg Accept any type except Null, Nothing and Unit - * @param timestamp timestamp cannot be larger than Instant.ofEpochMilli(Long.MaxValue) + * @param value Accept any type except Null, Nothing and Unit + * @param timestamp timestamp must be smaller than Long.MaxValue + */ + def apply(value: Any, timestamp: TimeStamp): Message = { + DefaultMessage(value, timestamp) + } + + /** + * @param value Accept any type except Null, Nothing and Unit + * @param timestamp timestamp must be smaller than Instant.ofEpochMilli(Long.MaxValue) */ - def apply(msg: Any, timestamp: Instant): Message = { - new Message(msg, timestamp) + def apply(value: Any, timestamp: Instant): Message = { + new DefaultMessage(value, timestamp) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala index ddd4d1a..dbc0efa 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala @@ -30,7 +30,7 @@ class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, override def onStart(startTime: Instant): Unit = {} override def onNext(msg: Message): Unit = { - val list = msg.msg.asInstanceOf[Vector[String]] - output(new Message(list :+ getClass.getCanonicalName, System.currentTimeMillis())) + val list = msg.value.asInstanceOf[Vector[String]] + output(Message(list :+ getClass.getCanonicalName, System.currentTimeMillis())) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala index e9b00a0..8cc23ff 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala @@ -35,7 +35,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, } override def onNext(msg: Message): Unit = { - val l = msg.msg.asInstanceOf[Vector[String]] + val l = msg.value.asInstanceOf[Vector[String]] list.size match { case 1 => l.foreach(f => { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala index 074b389..e3bc29a 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala @@ -35,7 +35,7 @@ class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContex override def onNext(msg: Message): Unit = { val list = Vector(getClass.getCanonicalName) val now = Instant.now - output(new Message(list, now.toEpochMilli)) + output(Message(list, now.toEpochMilli)) self ! Watermark(now) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala index 241e0f6..f5376d5 100644 --- a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala +++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/NodeSpec.scala @@ -36,6 +36,6 @@ class NodeSpec extends PropSpec with PropertyChecks with Matchers with BeforeAnd val list = Vector(classOf[Node].getCanonicalName) val expected = Vector(classOf[Node].getCanonicalName, classOf[Node].getCanonicalName) node.onNext(Message(list)) - verify(context).output(argMatch[Message](_.msg == expected)) + verify(context).output(argMatch[Message](_.value == expected)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala index 20cad1c..f445566 100644 --- a/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala +++ b/examples/streaming/complexdag/src/test/scala/org/apache/gearpump/streaming/examples/complexdag/SourceSpec.scala @@ -38,7 +38,8 @@ class SourceSpec extends WordSpec with Matchers { val source = new Source(context, UserConfig.empty) source.onNext(Message("start")) - verify(context).output(argMatch[Message](Vector(classOf[Source].getCanonicalName) == _.msg)) + verify(context).output(argMatch[Message]( + Vector(classOf[Source].getCanonicalName) == _.value)) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala index 561346e..229f073 100644 --- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala @@ -63,7 +63,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig) } override def onNext(msg: Message): Unit = { - val kv = msg.msg.asInstanceOf[String].split("\\+\\+") + val kv = msg.value.asInstanceOf[String].split("\\+\\+") if (kv.length >= 2) { key.set(kv(0)) value.set(kv(1)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala index a03e68d..215cbfd 100644 --- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala @@ -79,7 +79,7 @@ class SeqFileStreamProducerSpec val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet verify(context).output(argMatch[Message](msg => - expected.contains(msg.msg.asInstanceOf[String]))) + expected.contains(msg.value.asInstanceOf[String]))) } after { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala index b78e788..4250a43 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala @@ -32,8 +32,8 @@ class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext } override def onNext(msg: Message): Unit = { - Injection.invert[String, Array[Byte]](msg.msg.asInstanceOf[Array[Byte]]) + Injection.invert[String, Array[Byte]](msg.value.asInstanceOf[Array[Byte]]) .foreach(_.split("\\s+").foreach( - word => output(new Message(word, msg.timestamp)))) + word => output(Message(word, msg.timestamp)))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala index 58bb884..d1a98d0 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala @@ -33,10 +33,10 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, override def onStart(startTime: Instant): Unit = {} override def onNext(message: Message): Unit = { - val word = message.msg.asInstanceOf[String] + val word = message.value.asInstanceOf[String] val count = wordcount.getOrElse(word, 0L) + 1 wordcount += word -> count - output(new Message( + output(Message( Injection[String, Array[Byte]](word) -> Injection[Long, Array[Byte]](count), message.timestamp)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala index a16cf4c..9fb581d 100644 --- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala @@ -20,10 +20,9 @@ package org.apache.gearpump.streaming.examples.sol import java.time.Instant import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.FiniteDuration import akka.actor.Cancellable - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.task.{Task, TaskContext} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala index 2b443e5..973a84e 100644 --- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala @@ -61,7 +61,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig) override def onNext(msg: Message): Unit = { val message = messages(rand.nextInt(messages.length)) - output(new Message(message, System.currentTimeMillis())) + output(Message(message, System.currentTimeMillis())) messageCount = messageCount + 1L self ! Watermark(Instant.now) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala index 4efc6e1..7f4bc22 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala @@ -38,7 +38,8 @@ class CountProcessor(taskContext: TaskContext, conf: UserConfig) override def processMessage(state: PersistentState[Int], message: Message): Unit = { state.update(message.timestamp.toEpochMilli, 1) - state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), message.timestamp))) + state.get.foreach(s => taskContext.output( + Message(serializer.serialize(s), message.timestamp))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala index 8ddbedd..0052c57 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessor.scala @@ -19,10 +19,8 @@ package org.apache.gearpump.streaming.examples.state.processor import scala.collection.immutable.TreeMap - import com.twitter.algebird.{AveragedGroup, AveragedValue} import org.slf4j.Logger - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.monoid.AlgebirdGroup @@ -48,7 +46,7 @@ class WindowAverageProcessor(taskContext: TaskContext, conf: UserConfig) override def processMessage(state: PersistentState[AveragedValue], message: Message): Unit = { - val value = AveragedValue(message.msg.asInstanceOf[String].toLong) + val value = AveragedValue(message.value.asInstanceOf[String].toLong) state.update(message.timestamp.toEpochMilli, value) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala new file mode 100644 index 0000000..619e5d4 --- /dev/null +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/DefaultMessageCountAppSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state + +import scala.concurrent.Future +import scala.util.Success + +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} + +import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.streaming.examples.state.MessageCountApp._ + +class DefaultMessageCountAppSpec + extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { + + before { + startActorSystem() + } + + after { + shutdownActorSystem() + } + + protected override def config = TestUtil.DEFAULT_CONFIG + + property("MessageCount should succeed to submit application with required arguments") { + val requiredArgs = Array( + s"-$SOURCE_TOPIC", "source", + s"-$SINK_TOPIC", "sink", + s"-$ZOOKEEPER_CONNECT", "localhost:2181", + s"-$BROKER_LIST", "localhost:9092", + s"-$DEFAULT_FS", "hdfs://localhost:9000" + ) + val optionalArgs = Array( + s"-$SOURCE_TASK", "2", + s"-$COUNT_TASK", "2", + s"-$SINK_TASK", "2" + ) + + val args = { + Table( + ("requiredArgs", "optionalArgs"), + (requiredArgs, optionalArgs.take(0)), + (requiredArgs, optionalArgs.take(2)), + (requiredArgs, optionalArgs.take(4)), + (requiredArgs, optionalArgs) + ) + } + + val masterReceiver = createMockMaster() + forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => + val args = requiredArgs ++ optionalArgs + Future { + MessageCountApp.main(masterConfig, args) + } + masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) + masterReceiver.reply(SubmitApplicationResult(Success(0))) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala deleted file mode 100644 index 729994e..0000000 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/MessageCountAppSpec.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.state - -import scala.concurrent.Future -import scala.util.Success - -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} - -import org.apache.gearpump.cluster.ClientToMaster.SubmitApplication -import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} -import org.apache.gearpump.streaming.examples.state.MessageCountApp._ - -class MessageCountAppSpec - extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter with MasterHarness { - - before { - startActorSystem() - } - - after { - shutdownActorSystem() - } - - protected override def config = TestUtil.DEFAULT_CONFIG - - property("MessageCount should succeed to submit application with required arguments") { - val requiredArgs = Array( - s"-$SOURCE_TOPIC", "source", - s"-$SINK_TOPIC", "sink", - s"-$ZOOKEEPER_CONNECT", "localhost:2181", - s"-$BROKER_LIST", "localhost:9092", - s"-$DEFAULT_FS", "hdfs://localhost:9000" - ) - val optionalArgs = Array( - s"-$SOURCE_TASK", "2", - s"-$COUNT_TASK", "2", - s"-$SINK_TASK", "2" - ) - - val args = { - Table( - ("requiredArgs", "optionalArgs"), - (requiredArgs, optionalArgs.take(0)), - (requiredArgs, optionalArgs.take(2)), - (requiredArgs, optionalArgs.take(4)), - (requiredArgs, optionalArgs) - ) - } - - val masterReceiver = createMockMaster() - forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) => - val args = requiredArgs ++ optionalArgs - Future { - MessageCountApp.main(masterConfig, args) - } - masterReceiver.expectMsgType[SubmitApplication](PROCESS_BOOT_TIME) - masterReceiver.reply(SubmitApplicationResult(Success(0))) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala index 5affb5e..158baeb 100644 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala @@ -33,7 +33,6 @@ import org.scalatest.{Matchers, PropSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.state.api.PersistentTask import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} import org.apache.gearpump.streaming.task.UpdateCheckpointClock import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java index a0996b3..22425e3 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java @@ -18,6 +18,7 @@ package org.apache.gearpump.streaming.examples.wordcountjava; +import org.apache.gearpump.DefaultMessage; import org.apache.gearpump.Message; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.streaming.javaapi.Task; @@ -45,7 +46,7 @@ public class Split extends Task { // Split the TEXT to words String[] words = TEXT.split(" "); for (int i = 0; i < words.length; i++) { - context.output(new Message(words[i], Instant.now().toEpochMilli())); + context.output(DefaultMessage.apply(words[i], Instant.now().toEpochMilli())); } self().tell(new Watermark(Instant.now()), self()); } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java index 3daa6e0..41cdbdc 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Sum.java @@ -42,8 +42,8 @@ public class Sum extends Task { } @Override - public void onNext(Message messagePayLoad) { - String word = (String) (messagePayLoad.msg()); + public void onNext(Message message) { + String word = (String) (message.value()); Integer current = wordCount.get(word); if (current == null) { current = 0; http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index d8262fd..2830b16 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -19,6 +19,7 @@ package org.apache.gearpump.streaming.examples.wordcountjava.dsl; import com.typesafe.config.Config; +import org.apache.gearpump.DefaultMessage; import org.apache.gearpump.Message; import org.apache.gearpump.cluster.ClusterConfig; import org.apache.gearpump.cluster.UserConfig; @@ -79,7 +80,7 @@ public class WordCount { @Override public Message read() { - return Message.apply(str, Instant.now()); + return new DefaultMessage(str, Instant.now()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala index dbefc93..6f482fa 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Sum.scala @@ -20,11 +20,10 @@ package org.apache.gearpump.streaming.examples.wordcount import java.time.Instant import java.util.concurrent.TimeUnit + import scala.collection.mutable import scala.concurrent.duration.FiniteDuration - import akka.actor.Cancellable - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -45,9 +44,9 @@ class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, override def onNext(msg: Message): Unit = { if (null != msg) { - val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L) + val current = map.getOrElse(msg.value.asInstanceOf[String], 0L) wordCount += 1 - map.put(msg.msg.asInstanceOf[String], current + 1) + map.put(msg.value.asInstanceOf[String], current + 1) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala index 5c2485b..3d412ff 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala @@ -18,15 +18,10 @@ package org.apache.gearpump.akkastream.task -import java.util.concurrent.TimeUnit - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.task.TaskContext -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration - class BatchTask[In, Out](context: TaskContext, userConf : UserConfig) extends GraphTask(context, userConf) { @@ -35,8 +30,8 @@ class BatchTask[In, Out](context: TaskContext, userConf : UserConfig) val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE) val seed = userConf.getValue[In => Out](BatchTask.SEED) - override def onNext(msg : Message) : Unit = { - val data = msg.msg.asInstanceOf[In] + override def onNext(msg: Message) : Unit = { + val data = msg.value.asInstanceOf[In] val time = msg.timestamp context.output(msg) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala index b77b9bd..94954c0 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala @@ -28,7 +28,7 @@ class ConcatTask(context: TaskContext, userConf : UserConfig) val sizeOfOutputs = sizeOfOutPorts var index = 0 - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { output(index, msg) index += 1 if (index == sizeOfOutputs) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala index ae91d1f..602f732 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala @@ -25,7 +25,6 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.task.TaskContext -import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration case object DelayInitialTime @@ -42,8 +41,8 @@ class DelayInitialTask[T](context: TaskContext, userConf : UserConfig) self ! Message(DelayInitialTime, Instant.now()) ) } - override def onNext(msg : Message) : Unit = { - msg.msg match { + override def onNext(msg: Message) : Unit = { + msg.value match { case DelayInitialTime => delayInitialActive = false case _ => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala index 4c19de5..c0756e3 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala @@ -42,8 +42,8 @@ class DropWithinTask[T](context: TaskContext, userConf : UserConfig) ) } - override def onNext(msg : Message) : Unit = { - msg.msg match { + override def onNext(msg: Message) : Unit = { + msg.value match { case DropWithinTimeout => timeoutActive = false case _ => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala index 14ff537..d3e815a 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala @@ -28,7 +28,7 @@ class FlattenMergeTask(context: TaskContext, userConf : UserConfig) val sizeOfOutputs = sizeOfOutPorts var index = 0 - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { output(index, msg) index += 1 if (index == sizeOfOutputs) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala index d982ebd..8de9f8d 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala @@ -38,13 +38,13 @@ class FoldTask[In, Out](context: TaskContext, userConf : UserConfig) }) } - override def onNext(msg : Message) : Unit = { - val data = msg.msg.asInstanceOf[In] + override def onNext(msg: Message) : Unit = { + val data = msg.value.asInstanceOf[In] val time = msg.timestamp aggregator.foreach(func => { aggregated = func(aggregated, data) LOG.info(s"aggregated = $aggregated") - val msg = new Message(aggregated, time) + val msg = Message(aggregated, time) context.output(msg) }) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala index eaf2b3f..12e2d40 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala @@ -33,7 +33,7 @@ class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig) val timeWindow = userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW) val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE) - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala index 741ec43..908e21e 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala @@ -29,7 +29,7 @@ class InterleaveTask(context: TaskContext, userConf : UserConfig) var index = 0 // TODO access upstream and pull - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { output(index, msg) index += 1 if (index == sizeOfInputs) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala index daa1afc..c500ba2 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala @@ -30,15 +30,15 @@ class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig) val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC) implicit val ec = context.system.dispatcher - override def onNext(msg : Message) : Unit = { - val data = msg.msg.asInstanceOf[In] + override def onNext(msg: Message) : Unit = { + val data = msg.value.asInstanceOf[In] val time = msg.timestamp f match { case Some(func) => val fout = func(data) fout.onComplete(value => { value.foreach(out => { - val msg = new Message(out, time) + val msg = Message(out, time) context.output(msg) }) }) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala index ad18f72..1ecc4d0 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala @@ -28,7 +28,7 @@ class MergeTask(context: TaskContext, userConf : UserConfig) val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE) val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS) - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { context.output(msg) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala index 5bea47e..bff4e76 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala @@ -27,7 +27,7 @@ class SingleSourceTask[T](context: TaskContext, userConf : UserConfig) val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { context.output(Message(elem, msg.timestamp)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala index 1b9c4e3..d92e24c 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala @@ -61,7 +61,7 @@ class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig) override def onStart(startTime : Instant) : Unit = {} - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { queue.add(msg) trySendingData() } @@ -71,7 +71,7 @@ class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig) private def trySendingData(): Unit = { if (subscriber != null) { (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg => - subscriber ! msg.msg + subscriber ! msg.value request -= 1 } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala index 5b64a52..b7fd9c3 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala @@ -53,7 +53,7 @@ class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig) override def onStart(startTime : Instant) : Unit = {} - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { LOG.info("AkkaStreamSource receiving message " + msg) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala index b776f2c..d8beeb5 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala @@ -34,8 +34,8 @@ class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : UserConfig f = func() } - override def onNext(msg : Message) : Unit = { - val in: IN = msg.msg.asInstanceOf[IN] + override def onNext(msg: Message) : Unit = { + val in: IN = msg.value.asInstanceOf[IN] val out: Iterable[OUT] = f(in) val iterator = out.iterator while(iterator.hasNext) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala index 7aa4e8e..689a6b3 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala @@ -42,8 +42,8 @@ class TakeWithinTask[T](context: TaskContext, userConf : UserConfig) ) } - override def onNext(msg : Message) : Unit = { - msg.msg match { + override def onNext(msg: Message) : Unit = { + msg.value match { case DropWithinTimeout => timeoutActive = true case _ => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala index 3c7ad87..ef1e35f 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala @@ -24,7 +24,6 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.task.TaskContext -import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration class ThrottleTask[T](context: TaskContext, userConf : UserConfig) @@ -38,8 +37,8 @@ class ThrottleTask[T](context: TaskContext, userConf : UserConfig) val interval = timePeriod.toNanos / cost // TODO control rate from TaskActor - override def onNext(msg : Message) : Unit = { - val data = msg.msg.asInstanceOf[T] + override def onNext(msg: Message) : Unit = { + val data = msg.value.asInstanceOf[T] val time = msg.timestamp context.output(msg) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala index a10e138..086fd48 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.akkastream.task import java.time.Instant -import java.util.Date import java.util.concurrent.TimeUnit import org.apache.gearpump.Message @@ -33,7 +32,6 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig) val initialDelay = userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY). getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) - (TickSourceTask.INITIAL_DELAY) val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL). getOrElse(FiniteDuration(0, TimeUnit.MINUTES)) val tick = userConf.getValue[T](TickSourceTask.TICK).get @@ -44,7 +42,7 @@ class TickSourceTask[T](context: TaskContext, userConf : UserConfig) ) } - override def onNext(msg : Message) : Unit = { + override def onNext(msg: Message) : Unit = { context.output(msg) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala index 005d018..7f39ed7 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala @@ -29,10 +29,10 @@ class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig) val unzip = userConf. getValue[UnZipFunction[In, A1, A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip - override def onNext(msg : Message) : Unit = { - val message = msg.msg + override def onNext(msg: Message) : Unit = { + val value = msg.value val time = msg.timestamp - val pair = unzip(message.asInstanceOf[In]) + val pair = unzip(value.asInstanceOf[In]) val (a, b) = pair output(0, Message(a.asInstanceOf[AnyRef], time)) output(1, Message(b.asInstanceOf[AnyRef], time)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala index 7e0c082..ab0116c 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala @@ -31,8 +31,8 @@ class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig) var a1: Option[A1] = None var a2: Option[A2] = None - override def onNext(msg : Message) : Unit = { - val message = msg.msg + override def onNext(msg: Message) : Unit = { + val message = msg.value val time = msg.timestamp a1 match { case Some(x) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala ---------------------------------------------------------------------- diff --git a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala index 492fffe..41c6499 100644 --- a/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala +++ b/experiments/rabbitmq/src/main/scala/org/apache/gearpump/experimental/rabbitmq/RMQSink.scala @@ -49,7 +49,7 @@ class RMQSink(userConfig: UserConfig, } override def write(message: Message): Unit = { - publish(message.msg) + publish(message.value) } override def close(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala ---------------------------------------------------------------------- diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala index 36a9fe3..9afb1fe 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -59,7 +59,7 @@ class RedisSink( } override def write(message: Message): Unit = { - message.msg match { + message.value match { // GEO case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala index 4969314..86fc0ec 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitioner.scala @@ -42,7 +42,7 @@ private[storm] class StormPartitioner(target: String) extends MulticastPartition override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int) : Array[Int] = { - val stormTuple = msg.msg.asInstanceOf[GearpumpTuple] + val stormTuple = msg.value.asInstanceOf[GearpumpTuple] stormTuple.targetPartitions.getOrElse(target, Array(Partitioner.UNKNOWN_PARTITION_ID)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala index e3b45fb..2f1b77b 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormProcessor.scala @@ -20,8 +20,8 @@ package org.apache.gearpump.experiments.storm.processor import java.time.Instant import java.util.concurrent.TimeUnit -import scala.concurrent.duration.Duration +import scala.concurrent.duration.Duration import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpBolt http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index 4536277..248ca44 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.experiments.storm.util.StormConstants._ import org.apache.gearpump.experiments.storm.util.StormUtil._ import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUtil} import org.apache.gearpump.streaming.DAG -import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext} +import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId} import org.apache.gearpump.util.{Constants, LogUtil} import org.apache.gearpump.{Message, TimeStamp} import org.slf4j.Logger @@ -219,7 +219,7 @@ object GearpumpStormComponent { override def next(message: Message): Unit = { val timestamp = message.timestamp.toEpochMilli collector.setTimestamp(timestamp) - bolt.execute(message.msg.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, + bolt.execute(message.value.asInstanceOf[GearpumpTuple].toTuple(generalTopologyContext, timestamp)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala index 5fc631b..aabb7c1 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/partitioner/StormPartitionerSpec.scala @@ -52,7 +52,8 @@ class StormPartitionerSpec extends PropSpec with PropertyChecks with Matchers { targetPartitions.foreach { case (target, ps) => { val partitioner = new StormPartitioner(target) - ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, currentPartitionId) + ps shouldBe partitioner.getPartitions(Message(tuple), ps.last + 1, + currentPartitionId) } } val partitionNum = id http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index 6b894da..05627c9 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -18,8 +18,8 @@ package org.apache.gearpump.experiments.storm.util import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters._ import backtype.storm.generated.Grouping import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -27,8 +27,7 @@ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} - -import org.apache.gearpump.{Message, MIN_TIME_MILLIS, TimeStamp} +import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp} import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.streaming.MockUtil @@ -63,9 +62,9 @@ class StormOutputCollectorSpec stormOutputCollector.setTimestamp(timestamp) stormOutputCollector.emit(streamId, values) shouldBe targetStormTaskIds verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ - case Message(tuple: GearpumpTuple, t) => + message: Message => val expected = new GearpumpTuple(values, stormTaskId, streamId, targetPartitions) - tuple == expected && t == timestamp + message.value == expected && message.timestamp.toEpochMilli == timestamp })) } } @@ -96,11 +95,11 @@ class StormOutputCollectorSpec stormOutputCollector.emitDirect(id, streamId, values) val partitions = Array(StormUtil.stormTaskIdToGearpump(id).index) verify(taskContext, times(1)).output(MockUtil.argMatch[Message]({ - case Message(tuple: GearpumpTuple, t) => { + message: Message => { val expected = new GearpumpTuple(values, stormTaskId, streamId, Map(target -> partitions)) - val result = tuple == expected && t == timestamp + val result = message.value == expected && message.timestamp.toEpochMilli == timestamp result } })) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala index bb56003..7d0838e 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/SequenceFileSink.scala @@ -22,7 +22,6 @@ import java.text.SimpleDateFormat import org.apache.hadoop.fs.Path import org.apache.hadoop.hdfs.HdfsConfiguration import org.apache.hadoop.io.SequenceFile - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.hadoop.lib.HadoopUtil http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala index 822eb5f..04e4781 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala @@ -18,13 +18,12 @@ package org.apache.gearpump.streaming.hadoop.lib.format import org.apache.hadoop.io.{LongWritable, Text, Writable} - import org.apache.gearpump.Message class DefaultSequenceFormatter extends OutputFormatter { override def getKey(message: Message): Writable = new LongWritable(message.timestamp.toEpochMilli) - override def getValue(message: Message): Writable = new Text(message.msg.asInstanceOf[String]) + override def getValue(message: Message): Writable = new Text(message.value.asInstanceOf[String]) override def getKeyClass: Class[_ <: Writable] = classOf[LongWritable] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala index 435d0fc..2e5874c 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala @@ -18,7 +18,6 @@ package org.apache.gearpump.streaming.hadoop.lib.format import org.apache.hadoop.io.Writable - import org.apache.gearpump.Message trait OutputFormatter extends Serializable { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala index 4b41ba1..f5e6483 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala @@ -84,7 +84,7 @@ class HBaseSink(userConfig: UserConfig, tableName: String, } override def write(message: Message): Unit = { - put(message.msg) + put(message.value) } def close(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala index e5534a6..76b4c0b 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala @@ -68,7 +68,7 @@ abstract class AbstractKafkaSink private[kafka]( } override def write(message: Message): Unit = { - message.msg match { + message.value match { case (k: Array[Byte], v: Array[Byte]) => val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, k, v) producer.send(record) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala index a8bda50..d5a8729 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala @@ -28,7 +28,7 @@ import org.apache.gearpump.streaming.kafka.lib.KafkaMessageDecoder import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient import KafkaClient.KafkaClientFactory -import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread} +import org.apache.gearpump.streaming.kafka.lib.source.consumer.{FetchThread, KafkaMessage} import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient import org.apache.gearpump.streaming.kafka.util.KafkaConfig @@ -97,6 +97,7 @@ abstract class AbstractKafkaSource( /** * Reads a record from incoming queue, decodes, filters and checkpoints offsets * before returns a Message. Message can be null if the incoming queue is empty. + * * @return a [[org.apache.gearpump.Message]] or null */ override def read(): Message = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala index 9f29022..2b52d76 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.streaming.kafka.lib.source -import java.time.Instant - import com.twitter.bijection.Injection import org.scalacheck.Gen import org.scalatest.prop.PropertyChecks @@ -34,7 +32,7 @@ class DefaultKafkaMessageDecoderSpec extends PropSpec with PropertyChecks with M val msgAndWmk = decoder.fromBytes(kbytes, vbytes) val message = msgAndWmk.message val watermark = msgAndWmk.watermark - message.msg shouldBe vbytes + message.value shouldBe vbytes // processing time as message timestamp and watermark message.timestamp shouldBe watermark } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala index 7e1214e..3789d4e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.partitioner.UnicastPartitioner class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner { override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = fn(message.msg.asInstanceOf[T]).hashCode() + val hashCode = fn(message.value.asInstanceOf[T]).hashCode() (hashCode & Integer.MAX_VALUE) % partitionNum } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index 9a614e8..9c5e347 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -291,7 +291,7 @@ class LoggerSink[T] extends DataSink { } override def write(message: Message): Unit = { - logger.info("logging message " + message.msg) + logger.info("logging message " + message.value) } override def close(): Unit = Unit http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 86ac933..9571697 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -42,18 +42,19 @@ object TransformTask { val processor = operator.map(FunctionRunner.withEmitFn(_, (out: OUT) => taskContext.output(Message(out, watermarkTime)))) processor.foreach(_.setup()) - buffer.foreach { case message@Message(in, time) => - if (time < watermarkTime) { - processor match { - case Some(p) => - // .toList forces eager evaluation - p.process(in.asInstanceOf[IN]).toList - case None => - taskContext.output(Message(in, watermarkTime)) + buffer.foreach { + message: Message => + if (message.timestamp.toEpochMilli < watermarkTime) { + processor match { + case Some(p) => + // .toList forces eager evaluation + p.process(message.value.asInstanceOf[IN]).toList + case None => + taskContext.output(Message(message.value, watermarkTime)) + } + } else { + nextBuffer +:= message } - } else { - nextBuffer +:= message - } } // .toList forces eager evaluation processor.map(_.finish().toList) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index 5f9d19b..05ce74e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -68,7 +68,7 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) { def groupBy(message: Message): (GROUP, List[Window]) = { - val ele = message.msg.asInstanceOf[T] + val ele = message.value.asInstanceOf[T] val group = groupByFn(ele) val windows = window.windowFn(new WindowFunction.Context[T] { override def element: T = ele http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 42d50e2..74749b9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -58,7 +58,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean] override def process(message: Message): Unit = { - val input = message.msg.asInstanceOf[IN] + val input = message.value.asInstanceOf[IN] val (group, windows) = groupBy.groupBy(message) if (!groupedWindowInputs.containsKey(group)) { groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]()) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala index 9b63e04..36c331a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala @@ -25,8 +25,8 @@ class BroadcastPartitioner extends MulticastPartitioner { private var lastPartitionNum = -1 private var partitions = Array.empty[Int] - override def getPartitions( - msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = { + override def getPartitions(msg: Message, partitionNum: Int, + currentPartitionId: Int): Array[Int] = { if (partitionNum != lastPartitionNum) { partitions = (0 until partitionNum).toArray lastPartitionNum = partitionNum http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala index 6137705..dc48741 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala @@ -27,6 +27,6 @@ import org.apache.gearpump.Message */ class HashPartitioner extends UnicastPartitioner { override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum + (msg.value.hashCode() & Integer.MAX_VALUE) % partitionNum } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala index 4371257..0ec2b6f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala @@ -19,8 +19,7 @@ package org.apache.gearpump.streaming.source import java.time.Instant -import org.apache.gearpump.Message -import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS} +import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message} /** * message used by source task to report source watermark. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index f9f5b33..d3ffaa9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} -import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory import org.apache.gearpump.util.LogUtil import org.apache.gearpump.{Message, TimeStamp} @@ -79,7 +79,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) } final override def onNext(message: Message): Unit = { - checkpointManager.update(message.timeInMillis) + checkpointManager.update(message.timestamp.toEpochMilli) .foreach(state.setNextCheckpointTime) processMessage(state, message) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/176d8276/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala index 3cc1cd0..6947dd4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.task import akka.actor.{ActorRef, ExtendedActorSystem} - import org.apache.gearpump.Message import org.apache.gearpump.transport.netty.TaskMessage import org.apache.gearpump.transport.{Express, HostPort} @@ -55,8 +54,8 @@ trait ExpressTransport { if (null == serializedMessage) { msg match { case message: Message => - val bytes = serializerPool.get().serialize(message.msg) - serializedMessage = SerializedMessage(message.timeInMillis, bytes) + val bytes = serializerPool.get().serialize(message.value) + serializedMessage = SerializedMessage(message.timestamp.toEpochMilli, bytes) case _ => serializedMessage = msg } }
