[GEARPUMP-188] use java.time.Instant for Task start time Author: manuzhang <[email protected]>
Closes #74 from manuzhang/replace_timestamp_with_instant. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23daf0cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23daf0cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23daf0cf Branch: refs/heads/master Commit: 23daf0cf9c1db3fabc1b679993fcf1d6edb43e7d Parents: 6d919ec Author: manuzhang <[email protected]> Authored: Mon Aug 15 23:04:11 2016 +0800 Committer: manuzhang <[email protected]> Committed: Mon Aug 15 23:04:11 2016 +0800 ---------------------------------------------------------------------- .../pagerank/PageRankController.scala | 4 +- .../streaming/examples/complexdag/Node.scala | 6 +- .../streaming/examples/complexdag/Sink.scala | 6 +- .../streaming/examples/complexdag/Source.scala | 6 +- .../examples/fsio/SeqFileStreamProcessor.scala | 8 +- .../examples/fsio/SeqFileStreamProducer.scala | 9 +- .../fsio/SeqFileStreamProcessorSpec.scala | 5 +- .../fsio/SeqFileStreamProducerSpec.scala | 5 +- .../examples/kafka/wordcount/Split.scala | 7 +- .../examples/kafka/wordcount/Sum.scala | 7 +- .../examples/kafka/wordcount/SumSpec.scala | 5 +- .../examples/sol/SOLStreamProcessor.scala | 5 +- .../examples/sol/SOLStreamProducer.scala | 5 +- .../examples/sol/SOLStreamProcessorSpec.scala | 5 +- .../examples/sol/SOLStreamProducerSpec.scala | 5 +- .../processor/NumberGeneratorProcessor.scala | 8 +- .../state/processor/CountProcessorSpec.scala | 6 +- .../NumberGeneratorProcessorSpec.scala | 5 +- .../processor/WindowAverageProcessorSpec.scala | 6 +- examples/streaming/stockcrawler/README.md | 19 -- .../src/main/resources/geardefault.conf | 9 - .../src/main/resources/stock/css/body.png | Bin 201 -> 0 bytes .../src/main/resources/stock/css/custom.css | 115 ----------- .../src/main/resources/stock/css/foot.png | Bin 2250 -> 0 bytes .../src/main/resources/stock/css/header.png | Bin 17350 -> 0 bytes .../src/main/resources/stock/js/stock.js | 157 --------------- .../src/main/resources/stock/stock.html | 87 -------- .../streaming/examples/stock/Analyzer.scala | 170 ---------------- .../streaming/examples/stock/Crawler.scala | 60 ------ .../streaming/examples/stock/Data.scala | 61 ------ .../streaming/examples/stock/QueryServer.scala | 134 ------------- .../streaming/examples/stock/StockMarket.scala | 155 --------------- .../streaming/examples/stock/main/Stock.scala | 86 -------- examples/streaming/transport/README.md | 3 - .../src/main/resources/geardefault.conf | 12 -- .../src/main/resources/transport/css/body.png | Bin 201 -> 0 bytes .../src/main/resources/transport/css/custom.css | 115 ----------- .../src/main/resources/transport/css/foot.png | Bin 2250 -> 0 bytes .../src/main/resources/transport/css/header.png | Bin 17350 -> 0 bytes .../main/resources/transport/js/transport.js | 180 ----------------- .../main/resources/transport/svg/beijing.svg | 199 ------------------- .../src/main/resources/transport/transport.html | 88 -------- .../streaming/examples/transport/Data.scala | 32 --- .../examples/transport/DataSource.scala | 56 ------ .../examples/transport/QueryServer.scala | 154 -------------- .../examples/transport/Transport.scala | 69 ------- .../examples/transport/VelocityInspector.scala | 123 ------------ .../examples/transport/generator/MockCity.scala | 88 -------- .../generator/PassRecordGenerator.scala | 69 ------- .../examples/transport/DataSourceSpec.scala | 45 ----- .../examples/transport/TransportSpec.scala | 69 ------- .../transport/generator/MockCitySpec.scala | 31 --- .../generator/PassRecordGeneratorSpec.scala | 34 ---- .../streaming/examples/wordcountjava/Split.java | 5 +- .../streaming/examples/wordcountjava/Sum.java | 4 +- .../streaming/examples/wordcount/Split.scala | 5 +- .../streaming/examples/wordcount/Sum.scala | 5 +- .../examples/wordcount/SplitSpec.scala | 7 +- .../streaming/examples/wordcount/SumSpec.scala | 5 +- .../storm/processor/StormProcessor.scala | 3 +- .../storm/producer/StormProducer.scala | 3 +- .../storm/topology/GearpumpStormComponent.scala | 9 +- .../storm/processor/StormProcessorSpec.scala | 5 +- .../storm/producer/StormProducerSpec.scala | 5 +- .../topology/GearpumpStormComponentSpec.scala | 9 +- .../topology/GearpumpStormTopologySpec.scala | 1 - .../kafka/lib/source/AbstractKafkaSource.scala | 5 +- .../streaming/kafka/KafkaSourceSpec.scala | 10 +- .../apache/gearpump/streaming/javaapi/Task.java | 5 +- .../gearpump/streaming/dsl/StreamApp.scala | 32 +-- .../streaming/dsl/plan/OpTranslator.scala | 24 +-- .../gearpump/streaming/sink/DataSinkTask.scala | 7 +- .../gearpump/streaming/source/DataSource.scala | 6 +- .../streaming/source/DataSourceTask.scala | 8 +- .../streaming/state/api/PersistentTask.scala | 8 +- .../gearpump/streaming/task/StartTime.scala | 24 --- .../apache/gearpump/streaming/task/Task.scala | 6 +- .../gearpump/streaming/task/TaskActor.scala | 7 +- .../gearpump/streaming/task/TaskWrapper.scala | 12 +- .../streaming/appmaster/AppMasterSpec.scala | 10 +- .../streaming/appmaster/TaskManagerSpec.scala | 4 +- .../streaming/appmaster/TaskSchedulerSpec.scala | 11 +- .../gearpump/streaming/dsl/StreamSpec.scala | 4 +- .../streaming/dsl/plan/OpTranslatorSpec.scala | 12 +- .../streaming/sink/DataSinkTaskSpec.scala | 8 +- .../streaming/source/DataSourceTaskSpec.scala | 7 +- .../streaming/task/SubscriptionSpec.scala | 7 +- 87 files changed, 199 insertions(+), 2617 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala index d461876..aa250da 100644 --- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankController.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.experiments.pagerank +import java.time.Instant + import akka.actor.Actor.Receive import org.apache.gearpump.cluster.UserConfig @@ -39,7 +41,7 @@ class PageRankController(taskContext: TaskContext, conf: UserConfig) var weights = Map.empty[TaskId, Double] var deltas = Map.empty[TaskId, Double] - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { output(Tick(tick), tasks: _*) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala index 8d163f9..ddd4d1a 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Node.scala @@ -18,14 +18,16 @@ package org.apache.gearpump.streaming.examples.complexdag +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class Node(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output - override def onStart(startTime: StartTime): Unit = {} + override def onStart(startTime: Instant): Unit = {} override def onNext(msg: Message): Unit = { val list = msg.msg.asInstanceOf[Vector[String]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala index 8dfa565..e9b00a0 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Sink.scala @@ -18,9 +18,11 @@ package org.apache.gearpump.streaming.examples.complexdag +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} import scala.collection.mutable @@ -28,7 +30,7 @@ class Sink(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, var list = mutable.MutableList[String]() - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { list += getClass.getCanonicalName } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala index 0359519..7abb3fc 100644 --- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala +++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala @@ -18,14 +18,16 @@ package org.apache.gearpump.streaming.examples.complexdag +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { self ! Message("start") } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala index 2e4a556..561346e 100644 --- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessor.scala @@ -18,19 +18,19 @@ package org.apache.gearpump.streaming.examples.fsio import java.io.File +import java.time.Instant import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.FiniteDuration import akka.actor.Cancellable import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile._ import org.apache.hadoop.io.{SequenceFile, Text} - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProcessor._ -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig) extends Task(taskContext, config) { @@ -49,7 +49,7 @@ class SeqFileStreamProcessor(taskContext: TaskContext, config: UserConfig) private var snapShotTime: Long = 0 private var scheduler: Cancellable = null - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { val fs = FileSystem.get(hadoopConf) fs.deleteOnExit(outputPath) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala index 02d2434..4106a2c 100644 --- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala +++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala @@ -17,15 +17,16 @@ */ package org.apache.gearpump.streaming.examples.fsio +import java.time.Instant + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.SequenceFile._ import org.apache.hadoop.io.{SequenceFile, Text} - import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.examples.fsio.HadoopConfig._ import org.apache.gearpump.streaming.examples.fsio.SeqFileStreamProducer._ -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig) extends Task(taskContext, config) { @@ -34,12 +35,12 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig) val value = new Text() val key = new Text() - var reader: SequenceFile.Reader = null + var reader: SequenceFile.Reader = _ val hadoopConf = config.hadoopConf val fs = FileSystem.get(hadoopConf) val inputPath = new Path(config.getString(INPUT_PATH).get) - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { reader = new SequenceFile.Reader(hadoopConf, Reader.file(inputPath)) self ! Start LOG.info("sequence file spout initiated") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala index 7831b14..2edb87f 100644 --- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProcessorSpec.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.streaming.examples.fsio import java.io.File +import java.time.Instant import scala.collection.mutable.ArrayBuffer import akka.actor.ActorSystem @@ -33,7 +34,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, PropSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.task.{StartTime, TaskId} +import org.apache.gearpump.streaming.task.TaskId import org.apache.gearpump.streaming.{MockUtil, Processor} class SeqFileStreamProcessorSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { @@ -67,7 +68,7 @@ class SeqFileStreamProcessorSpec when(context.taskId).thenReturn(taskId) val processor = new SeqFileStreamProcessor(context, conf) - processor.onStart(StartTime(0)) + processor.onStart(Instant.EPOCH) forAll(kvGenerator) { kv => val (key, value) = kv http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala index ad27e63..a03e68d 100644 --- a/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala +++ b/examples/streaming/fsio/src/test/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducerSpec.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.examples.fsio +import java.time.Instant + import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration @@ -32,7 +34,6 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.MockUtil._ -import org.apache.gearpump.streaming.task.StartTime class SeqFileStreamProducerSpec extends PropSpec with PropertyChecks with Matchers with BeforeAndAfter { @@ -73,7 +74,7 @@ class SeqFileStreamProducerSpec val context = MockUtil.mockTaskContext val producer = new SeqFileStreamProducer(context, conf) - producer.onStart(StartTime(0)) + producer.onStart(Instant.EPOCH) producer.onNext(Message("start")) val expected = kvPairs.map(kv => kv._1 + "++" + kv._2).toSet http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala index a95f596..b78e788 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Split.scala @@ -18,16 +18,17 @@ package org.apache.gearpump.streaming.examples.kafka.wordcount -import com.twitter.bijection.Injection +import java.time.Instant +import com.twitter.bijection.Injection import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { } override def onNext(msg: Message): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala index 9930b92..58bb884 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/Sum.scala @@ -18,18 +18,19 @@ package org.apache.gearpump.streaming.examples.kafka.wordcount -import com.twitter.bijection.Injection +import java.time.Instant +import com.twitter.bijection.Injection import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class Sum(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output private[wordcount] var wordcount = Map.empty[String, Long] - override def onStart(startTime: StartTime): Unit = {} + override def onStart(startTime: Instant): Unit = {} override def onNext(message: Message): Unit = { val word = message.msg.asInstanceOf[String] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala index 3538ece..e37118a 100644 --- a/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala +++ b/examples/streaming/kafka/src/test/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/SumSpec.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.examples.kafka.wordcount +import java.time.Instant + import scala.collection.mutable import org.mockito.Matchers._ @@ -27,7 +29,6 @@ import org.scalatest.{FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime class SumSpec extends FlatSpec with Matchers { @@ -39,7 +40,7 @@ class SumSpec extends FlatSpec with Matchers { val taskContext = MockUtil.mockTaskContext val sum = new Sum(taskContext, UserConfig.empty) - sum.onStart(StartTime(0)) + sum.onStart(Instant.EPOCH) val str = "once two two three three three" var totalWordCount = 0 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala index 796b0d2..a16cf4c 100644 --- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessor.scala @@ -18,6 +18,7 @@ package org.apache.gearpump.streaming.examples.sol +import java.time.Instant import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration @@ -25,7 +26,7 @@ import akka.actor.Cancellable import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -38,7 +39,7 @@ class SOLStreamProcessor(taskContext: TaskContext, conf: UserConfig) private var snapShotWordCount: Long = 0 private var snapShotTime: Long = 0 - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS))(reportWordCount()) snapShotTime = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala index 84ed038..c1b11e5 100644 --- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala +++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala @@ -18,12 +18,13 @@ package org.apache.gearpump.streaming.examples.sol +import java.time.Instant import java.util.Random import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._ -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { @@ -36,7 +37,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig) private var rand: Random = null private var messageCount: Long = 0 - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { prepareRandomMessage self ! Start } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala index a6cc966..e3344bf 100644 --- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala +++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProcessorSpec.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.examples.sol +import java.time.Instant + import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.{FlatSpec, Matchers} @@ -24,7 +26,6 @@ import org.scalatest.{FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime class SOLStreamProcessorSpec extends FlatSpec with Matchers { @@ -33,7 +34,7 @@ class SOLStreamProcessorSpec extends FlatSpec with Matchers { val context = MockUtil.mockTaskContext val sol = new SOLStreamProcessor(context, UserConfig.empty) - sol.onStart(StartTime(0)) + sol.onStart(Instant.EPOCH) val msg = Message("msg") sol.onNext(msg) verify(context, times(1)).output(msg) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala index 2316de8..dc21171 100644 --- a/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala +++ b/examples/streaming/sol/src/test/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducerSpec.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.examples.sol +import java.time.Instant + import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} @@ -24,7 +26,6 @@ import org.scalatest.{Matchers, WordSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime class SOLStreamProducerSpec extends WordSpec with Matchers { @@ -35,7 +36,7 @@ class SOLStreamProducerSpec extends WordSpec with Matchers { val context = MockUtil.mockTaskContext val producer = new SOLStreamProducer(context, conf) - producer.onStart(StartTime(0)) + producer.onStart(Instant.EPOCH) producer.onNext(Message("msg")) verify(context).output(any[Message]) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala index 0e85f32..134afba 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala @@ -18,17 +18,19 @@ package org.apache.gearpump.streaming.examples.state.processor +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { import taskContext.output private var num = 0L - override def onStart(startTime: StartTime): Unit = { - num = startTime.startTime + override def onStart(startTime: Instant): Unit = { + num = startTime.toEpochMilli self ! Message("start") } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala index 6048034..b95d164 100644 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.examples.state.processor +import java.time.Instant + import scala.concurrent.Await import scala.concurrent.duration._ @@ -33,7 +35,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.state.api.PersistentTask import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig} -import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime} +import org.apache.gearpump.streaming.task.UpdateCheckpointClock import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { @@ -59,7 +61,7 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers { val appMaster = TestProbe()(system) when(taskContext.appMaster).thenReturn(appMaster.ref) - count.onStart(StartTime(0L)) + count.onStart(Instant.EPOCH) appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L)) for (i <- 0L to num) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala index 2268994..d3f645c 100644 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.examples.state.processor +import java.time.Instant + import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -30,7 +32,6 @@ import org.scalatest.{Matchers, WordSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime class NumberGeneratorProcessorSpec extends WordSpec with Matchers { "NumberGeneratorProcessor" should { @@ -47,7 +48,7 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers { val conf = UserConfig.empty val genNum = new NumberGeneratorProcessor(taskContext, conf) - genNum.onStart(StartTime(0)) + genNum.onStart(Instant.EPOCH) mockTaskActor.expectMsgType[Message] genNum.onNext(Message("next")) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala index 0963429..255f869 100644 --- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala +++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.examples.state.processor +import java.time.Instant + import scala.concurrent.Await import scala.concurrent.duration._ @@ -34,7 +36,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.state.api.PersistentTask import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig} -import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime} +import org.apache.gearpump.streaming.task.UpdateCheckpointClock import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Matchers { @@ -61,7 +63,7 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match val appMaster = TestProbe()(system) when(taskContext.appMaster).thenReturn(appMaster.ref) - windowAverage.onStart(StartTime(0L)) + windowAverage.onStart(Instant.EPOCH) appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, 0L)) for (i <- 0L until num) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/README.md b/examples/streaming/stockcrawler/README.md deleted file mode 100644 index b51590f..0000000 --- a/examples/streaming/stockcrawler/README.md +++ /dev/null @@ -1,19 +0,0 @@ -How to use -=================== -1. Start local cluster, - ``` - bin/local - ``` -2. Submit the stock crawler - ``` - bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock - ``` - - If you are behind a proxy, you need to set the proxy address - ``` - bin\gear app -jar examples\gearpump-examples-assembly-0.3.2-SNAPSHOT.jar org.apache.gearpump.streaming.examples.stock.main.Stock -proxy host:port - ``` - -3. Check the UI - http://127.0.0.1:8080 - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf b/examples/streaming/stockcrawler/src/main/resources/geardefault.conf deleted file mode 100644 index acee3bd..0000000 --- a/examples/streaming/stockcrawler/src/main/resources/geardefault.conf +++ /dev/null @@ -1,9 +0,0 @@ -gearpump { - serializers { - "org.apache.gearpump.streaming.examples.stock.StockPrice" = "" - } -} - -spray.can { - server.parsing.max-content-length = "10M" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png deleted file mode 100644 index b5c536c..0000000 Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/body.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css b/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css deleted file mode 100644 index 182d722..0000000 --- a/examples/streaming/stockcrawler/src/main/resources/stock/css/custom.css +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -.ui-datepicker { - font-size: 11px; -} - -.sidebar-label { - font-size: 15px; - font-family: calibri, Arial, Helvetica, sans-serif; -} - -.help { - font-size: 12px; - font-family: calibri, Arial, Helvetica, sans-serif; -} - -div.splitter { - margin: 12px 0px 7px 0px; - clear: both; - border-top: 1px solid #EBEBEB; -} - -input.sidebar { - width: 165px -} - -select.sidebar { - width: 198px -} - -table.dataintable { - font-family: calibri, Arial, Helvetica, sans-serif; - font-size: 15px; - margin-top: 10px; - border-collapse: collapse; - border: 1px solid #888; -} - -table.dataintable th { - vertical-align: baseline; - padding: 5px 15px 5px 5px; - background-color: #EEE; - border: 1px solid #888; - text-align: left; -} - -table.dataintable td { - vertical-align: text-top; - padding: 5px 15px 5px 5px; - background-color: #FFFFFF; - border: 1px solid #AAA; -} - -#search { - width: 100px; - height: 25px; - position: relative; - left: 0px; - top: 5px; -} - -#mytable { - width: 100%; - height: 300; - float: left; -} - -#mychart { - height: 250px; - width: 100%; -} - -#Menu { - height: 100%; - width: 245px; - float: left; -} - -#header { - height: 115px; - background-image: url(header.png); -} - -#body { - height: 100%; - width: 100%; - background-image: url(body.png); - background-size: 100% 100%; -} - -#footer { - color: white; - height: 70px; - line-height: 70px; - text-align: middle; - clear: both; - text-align: center; - background-image: url(foot.png); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png deleted file mode 100644 index 5db91b5..0000000 Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/foot.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png b/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png deleted file mode 100644 index 9284e44..0000000 Binary files a/examples/streaming/stockcrawler/src/main/resources/stock/css/header.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js b/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js deleted file mode 100644 index 97f1e07..0000000 --- a/examples/streaming/stockcrawler/src/main/resources/stock/js/stock.js +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -function initChart(chartid, tableid, stockId) { - require.config({ - paths: { - echarts: 'http://echarts.baidu.com/build/dist' - } - }); - - require( - [ - 'echarts', - 'echarts/chart/line' - ], - - function (ec) { - // åºäºåå¤å¥½çdomï¼åå§åechartså¾è¡¨ - var myChart = ec.init(document.getElementById(chartid)); - var dataPoints = 100; - var timeTicket; - clearInterval(timeTicket); - timeTicket = setInterval(function () { - $.getJSON("report/" + stockId, function (json) { - STOCK_NAME = json.name - - var maxDrawnDown = json.currentMax[0].max.price - json.currentMax[0].min.price; - var time = new Date(json.currentMax[0].current.timestamp).toLocaleTimeString().replace(/^\D*/, ''); - // å¨ææ°æ®æ¥å£ addData - myChart.addData([ - [ - 0, // ç³»åç´¢å¼ - maxDrawnDown.toFixed(2), // æ°å¢æ°æ® - false, // æ°å¢æ°æ®æ¯å¦ä»éåå¤´é¨æå ¥ - false, // æ¯å¦å¢å éåé¿åº¦ï¼falseåèªå®å é¤åææ°æ®ï¼é头æå ¥å éå°¾ï¼éå°¾æå ¥å é头 - time - ], - [ - 1, // ç³»åç´¢å¼ - json.currentMax[0].current.price.toFixed(2), // æ°å¢æ°æ® - false, // æ°å¢æ°æ®æ¯å¦ä»éåå¤´é¨æå ¥ - false, // æ¯å¦å¢å éåé¿åº¦ï¼falseåèªå®å é¤åææ°æ®ï¼é头æå ¥å éå°¾ï¼éå°¾æå ¥å é头 - time - ] - ]); - document.getElementById(chartid).style.display = "block" - document.getElementById(tableid).innerHTML = "<pre>" + JSON.stringify(json, null, 2) + "</pre>" - }); - }, 2000); - - var subtext_ = "Draw Down" - - var option = { - title: { - text: 'Stock Analysis', - subtext: "Max " + subtext_ - }, - tooltip: { - trigger: 'axis' - }, - legend: { - data: ["Current Price", "Current Draw Down"] - }, - toolbox: { - show: false, - feature: { - mark: {show: true}, - dataView: {show: true, readOnly: false}, - magicType: {show: true, type: ['line', 'bar']}, - restore: {show: true}, - saveAsImage: {show: true} - } - }, - dataZoom: { - show: false, - start: 0, - end: 100 - }, - xAxis: [ - { - type: 'category', - boundaryGap: true, - data: (function () { - var now = new Date(); - var res = []; - var len = dataPoints; - while (len--) { - res.unshift(now.toLocaleTimeString().replace(/^\D*/, '')); - now = new Date(now - 2000); - } - return res; - })() - } - ], - yAxis: [ - { - type: 'value', - scale: true, - name: subtext_ + ' ä»·æ ¼/å ', - boundaryGap: [0, 0.3] - }, - { - type: 'value', - scale: true, - name: 'Current ä»·æ ¼/å ', - boundaryGap: [0, 0.1] - } - ], - series: [ - { - name: "Current Draw Down", - type: 'line', - data: (function () { - var res = []; - var len = dataPoints; - while (len--) { - res.push(0); - } - return res; - })() - }, - { - name: "Current Price", - type: 'line', - yAxisIndex: 1, - data: (function () { - var res = []; - var len = dataPoints; - while (len--) { - res.push(0); - } - return res; - })() - } - ] - }; - - // 为echarts对象å è½½æ°æ® - myChart.setOption(option); - } - ); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/resources/stock/stock.html ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html b/examples/streaming/stockcrawler/src/main/resources/stock/stock.html deleted file mode 100644 index 9682a53..0000000 --- a/examples/streaming/stockcrawler/src/main/resources/stock/stock.html +++ /dev/null @@ -1,87 +0,0 @@ -<!DOCTYPE html> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<html> - -<head> - <meta charset="utf-8"> - <link rel=stylesheet type=text/css href="css/custom.css"> - <script src="http://echarts.baidu.com/build/dist/echarts.js"></script> - <script src="http://libs.baidu.com/jquery/2.0.0/jquery.min.js"></script> - <script src="js/stock.js"></script> - <script type="text/javascript"> - function search_onclick() { - var stockId = document.getElementById('stockId').value - initChart("mychart", "mytable", stockId) - } - </script> -</head> - -<body style="background-color:#F2F2F2"> -<div id="container" style="width:882px; height:450px;margin-left:auto;margin-right:auto;"> - <div style="height:0px"></div> - <div id="header"> - <div - style="font-weight:600;position:relative;left:50px;top:50px;font-family: calibri, Arial, Helvetica, sans-serif;font-size:29px;color:white"> - Big Data Stock Analysis Demo - </div> - </div> - <div id="body"> - <div id="Menu"> - <div style="position:relative;margin-left:30px; margin-right:20px;margin-top:20px;"> - <!-- form to post to accompany to get accompanying cars --> - - <table style="width:100%"> - <tr> - <td class="sidebar-label">Stock Id:</td> - </tr> - <tr> - <td class="sidebar-label">Example: sh600019, sz000002</td> - </tr> - <tr> - <td style="vertical-align:top;"> - <input id="stockId" class="sidebar" type="text" name="stockId"/> - </td> - </tr> - </table> - <div class="splitter"></div> - <div> - <button id="search" onclick="search_onclick()">Search</button> - </div> - </div> - </div> - <div id="content" - style="height:100%;width:585px;float:left;position:relative;left:20px;overflow:scroll;"> - <div - style="height:50px;position:relative;top:15px;vertical-align:middle;font-weight:300;font-family: calibri, Arial, Helvetica, sans-serif;font-size:22px;color:black"> - Analysis Result: - </div> - <div style="height:7px;background-color:#92BDF2;"></div> - - <div id="mychart"></div> - - <div id="mytable"></div> - </div> - </div> - <div id="footer"> - Big Data Team @ Intel - </div> -</div> -</body> -</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala deleted file mode 100644 index f6fdff2..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Analyzer.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.stock - -import scala.collection.immutable - -import akka.actor.Actor.Receive -import org.joda.time.DateTime -import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.examples.stock.Analyzer.HistoricalStates -import org.apache.gearpump.streaming.examples.stock.Price._ -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} -import org.apache.gearpump.util.LogUtil - -/** - * Dradown analyzer - * Definition: http://en.wikipedia.org/wiki/Drawdown_(economics) - */ -class Analyzer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" - - private var stocksToReport = immutable.Set.empty[String] - private var stockInfos = new immutable.HashMap[String, StockPrice] - - private var currentDownwardsStates = new immutable.HashMap[String, StockPriceState] - private val historicalStates = new HistoricalStates() - private var latestTimeStamp: Long = 0L - - override def onStart(startTime: StartTime): Unit = { - LOG.info("analyzer is started") - } - - override def onNext(msg: Message): Unit = { - msg.msg match { - case stock: StockPrice => - latestTimeStamp = stock.timestamp - checkDate(stock) - stockInfos += stock.stockId -> stock - val downwardsState = updateCurrentStates(stock) - val maxDrawdown = historicalStates.updatePresentMaximal(downwardsState) - } - } - - override def receiveUnManagedMessage: Receive = { - case get@GetReport(stockId, date) => - var currentMax = currentDownwardsStates.get(stockId) - - val dateTime = Option(date) match { - case Some(date) => - currentMax = None - parseDate(dateFormatter, date) - case None => - new DateTime(latestTimeStamp).withTimeAtStartOfDay - } - - val historyMax = Option(dateTime).flatMap(handleHistoricalQuery(stockId, _)) - val name = stockInfos.get(stockId).map(_.name).getOrElse("") - sender ! Report(stockId, name, dateTime.toString, historyMax, currentMax) - } - - private def updateCurrentStates(stock: StockPrice) = { - var downwardsState: StockPriceState = null - if (currentDownwardsStates.contains(stock.stockId)) { - downwardsState = generateNewState(stock, currentDownwardsStates.get(stock.stockId).get) - } else { - downwardsState = StockPriceState(stock.stockId, stock, stock, stock) - } - currentDownwardsStates += stock.stockId -> downwardsState - downwardsState - } - - // Update the stock's latest state. - private def generateNewState(currentPrice: Price, oldState: StockPriceState): StockPriceState = { - if (currentPrice.price > oldState.max.price) { - StockPriceState(oldState.stockID, currentPrice, currentPrice, currentPrice) - } else { - val newState = StockPriceState(oldState.stockID, oldState.max, - Price.min(currentPrice, oldState.min), currentPrice) - newState - } - } - - private def checkDate(stock: StockPrice) = { - if (currentDownwardsStates.contains(stock.stockId)) { - val now = new DateTime(stock.timestamp) - val lastTime = new DateTime(currentDownwardsStates.get(stock.stockId).get.current.timestamp) - // New day - if (now.getDayOfYear > lastTime.getDayOfYear || now.getYear > lastTime.getYear) { - currentDownwardsStates -= stock.stockId - } - } - } - - private def parseDate(format: DateTimeFormatter, input: String): DateTime = { - format.parseDateTime(input) - } - - private def handleHistoricalQuery(stockId: String, date: DateTime) = { - val maximal = historicalStates.getHistoricalMaximal(stockId, date) - maximal - } -} - -object Analyzer { - - class HistoricalStates { - val LOG = LogUtil.getLogger(getClass) - val dateFormatter = DateTimeFormat forPattern "dd/MM/yyyy" - private var historicalMaxRaise = new immutable.HashMap[(String, DateTime), StockPriceState] - private var historicalMaxDrawdown = new immutable.HashMap[(String, DateTime), StockPriceState] - - def updatePresentMaximal(newState: StockPriceState): Option[StockPriceState] = { - val date = Analyzer.getDateFromTimeStamp(newState.current.timestamp) - var newMaximalState: Option[StockPriceState] = null - if (newState.max.price < Float.MinPositiveValue) { - newMaximalState = generateNewMaximal(newState, date, historicalMaxRaise) - if (newMaximalState.nonEmpty) { - historicalMaxRaise += (newState.stockID, date) -> newMaximalState.get - } - } else { - newMaximalState = generateNewMaximal(newState, date, historicalMaxDrawdown) - if (newMaximalState.nonEmpty) { - historicalMaxDrawdown += (newState.stockID, date) -> newMaximalState.get - } - } - newMaximalState - } - - def getHistoricalMaximal(stockId: String, date: DateTime): Option[StockPriceState] = { - historicalMaxDrawdown.get((stockId, date)) - } - - private def generateNewMaximal( - state: StockPriceState, - date: DateTime, - map: immutable.HashMap[(String, DateTime), StockPriceState]) - : Option[StockPriceState] = { - val maximal = map.get((state.stockID, date)) - if (maximal.nonEmpty && maximal.get.drawDown > state.drawDown) { - None - } else { - Some(state) - } - } - } - - def getDateFromTimeStamp(timestamp: Long): DateTime = { - new DateTime(timestamp).withTimeAtStartOfDay() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala deleted file mode 100644 index bb444dd..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Crawler.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.stock - -import scala.concurrent.duration._ - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} - -class Crawler(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - import taskContext._ - - val FetchStockPrice = Message("FetchStockPrice") - - lazy val stocks = { - val stockIds = conf.getValue[Array[String]]("StockId").get - val size = if (stockIds.length % parallelism > 0) { - stockIds.length / parallelism + 1 - } else { - stockIds.length / parallelism - } - - val start = taskId.index * size - val end = (taskId.index + 1) * size - stockIds.slice(start, end) - } - - scheduleOnce(1.seconds)(self ! FetchStockPrice) - - val stockMarket = conf.getValue[StockMarket](classOf[StockMarket].getName).get - - override def onStart(startTime: StartTime): Unit = { - // Nothing - } - - override def onNext(msg: Message): Unit = { - stockMarket.getPrice(stocks).foreach { price => - output(new Message(price, price.timestamp)) - } - scheduleOnce(5.seconds)(self ! FetchStockPrice) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala deleted file mode 100644 index 94a85ff..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/Data.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.stock - -// scalastyle:off equals.hash.code case class has equals defined -case class StockPrice( - stockId: String, name: String, price: String, delta: String, pecent: String, volume: String, - money: String, timestamp: Long) { - override def hashCode: Int = stockId.hashCode -} -// scalastyle:on equals.hash.code case class has equals defined - -case class Price(price: Float, timestamp: Long) - -object Price { - - import scala.language.implicitConversions - - implicit def StockPriceToPrice(stock: StockPrice): Price = { - Price(stock.price.toFloat, stock.timestamp) - } - - def min(first: Price, second: Price): Price = { - if (first.price < second.price) { - first - } else { - second - } - } -} - -case class StockPriceState(stockID: String, max: Price, min: Price, current: Price) { - - def drawDownPeriod: Long = min.timestamp - max.timestamp - - def recoveryPeriod: Long = current.timestamp - min.timestamp - - def drawDown: Float = max.price - min.price -} - -case class GetReport(stockId: String, date: String) - -case class Report( - stockId: String, name: String, date: String, historyMax: Option[StockPriceState], - currentMax: Option[StockPriceState]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala deleted file mode 100644 index 01ccb3e..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/QueryServer.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.stock - -import java.util.concurrent.TimeUnit -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -import akka.actor.Actor._ -import akka.actor.{Actor, ActorRefFactory, Props} -import akka.io.IO -import akka.pattern.ask -import spray.can.Http -import spray.http.StatusCodes -import spray.json._ -import spray.routing.{HttpService, Route} -import upickle.default.write - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterDataDetailRequest -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef} -import org.apache.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary} -import org.apache.gearpump.streaming.examples.stock.QueryServer.WebServer -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} - -class QueryServer(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - import scala.concurrent.ExecutionContext.Implicits.global - - import taskContext.{appId, appMaster} - - var analyzer: (ProcessorId, ProcessorSummary) = null - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - - override def onStart(startTime: StartTime): Unit = { - appMaster ! AppMasterDataDetailRequest(appId) - taskContext.actorOf(Props(new WebServer)) - } - - override def onNext(msg: Message): Unit = { - // Skip - } - - override def receiveUnManagedMessage: Receive = messageHandler - - def messageHandler: Receive = { - case detail: StreamAppMasterSummary => - analyzer = detail.processors.find { kv => - val (processorId, processor) = kv - processor.taskClass == classOf[Analyzer].getName - }.get - case getReport@GetReport(stockId, date) => - val parallism = analyzer._2.parallelism - val processorId = analyzer._1 - val analyzerTaskId = TaskId(processorId, (stockId.hashCode & Integer.MAX_VALUE) % parallism) - val requester = sender - import scala.concurrent.Future - (appMaster ? LookupTaskActorRef(analyzerTaskId)) - .asInstanceOf[Future[TaskActorRef]].flatMap { task => - - (task.task ? getReport).asInstanceOf[Future[Report]] - }.map { report => - LOG.info(s"reporting $report") - requester ! report - } - case _ => - // Ignore - } -} - -object QueryServer { - class WebServer extends Actor with HttpService { - - import context.dispatcher - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - def actorRefFactory: ActorRefFactory = context - implicit val system = context.system - - IO(Http) ! Http.Bind(self, interface = "localhost", port = 8080) - - override def receive: Receive = runRoute(webServer ~ staticRoute) - - def webServer: Route = { - path("report" / Segment) { stockId => - get { - onComplete((context.parent ? GetReport(stockId, null)).asInstanceOf[Future[Report]]) { - case Success(report: Report) => - val json = write(report) - complete(pretty(json)) - case Failure(ex) => complete(StatusCodes.InternalServerError, - s"An error occurred: ${ex.getMessage}") - } - } - } - } - - val staticRoute = { - pathEndOrSingleSlash { - getFromResource("stock/stock.html") - } ~ - pathPrefix("css") { - get { - getFromResourceDirectory("stock/css") - } - } ~ - pathPrefix("js") { - get { - getFromResourceDirectory("stock/js") - } - } - } - - private def pretty(json: String): String = { - json.parseJson.prettyPrint - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala deleted file mode 100644 index 24e050b..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/StockMarket.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.stock - -import java.nio.charset.Charset -import scala.io.Codec - -import org.apache.commons.httpclient.methods.GetMethod -import org.apache.commons.httpclient.{HttpClient, MultiThreadedHttpConnectionManager} -import org.htmlcleaner.{HtmlCleaner, TagNode} -import org.joda.time.{DateTime, DateTimeZone} - -import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour -import org.apache.gearpump.transport.HostPort -import org.apache.gearpump.util.LogUtil - -class StockMarket(service: ServiceHour, proxy: HostPort = null) extends Serializable { - - private def LOG = LogUtil.getLogger(getClass) - - @transient - private var connectionManager: MultiThreadedHttpConnectionManager = null - - private val eastMoneyStockPage = "http://quote.eastmoney.com/stocklist.html" - - private val stockPriceParser = - """^var\shq_str_s_([a-z0-9A-Z]+)="([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+)";$""".r - - def shutdown(): Unit = { - Option(connectionManager).map(_.shutdown()) - } - - @transient - private var _client: HttpClient = null - - private def client: HttpClient = { - _client = Option(_client).getOrElse { - val connectionManager = new MultiThreadedHttpConnectionManager() - val client = new HttpClient(connectionManager) - Option(proxy).map(host => client.getHostConfiguration().setProxy(host.host, host.port)) - client - } - _client - } - - def getPrice(stocks: Array[String]): Array[StockPrice] = { - - LOG.info(s"getPrice 1") - - val query = "http://hq.sinajs.cn/list=" + stocks.map("s_" + _).mkString(",") - if (service.inService) { - - LOG.info(s"getPrice 2") - - val get = new GetMethod(query) - client.executeMethod(get) - val current = System.currentTimeMillis() - - val output = scala.io.Source.fromInputStream(get.getResponseBodyAsStream)( - new Codec(Charset forName "GBK")).getLines().flatMap { line => - line match { - case stockPriceParser(stockId, name, price, delta, pecent, volume, money) => - Some(StockPrice(stockId, name, price, delta, pecent, volume, money, current)) - case _ => - None - } - }.toArray - - LOG.info(s"getPrice 3 ${output.length}") - - output - } else { - Array.empty[StockPrice] - } - } - - private val urlPattern = """^.*/([a-zA-Z0-9]+)\.html$""".r - - def getStockIdList: Array[String] = { - val cleaner = new HtmlCleaner - val props = cleaner.getProperties - - val get = new GetMethod(eastMoneyStockPage) - client.executeMethod(get) - - val root = cleaner.clean(get.getResponseBodyAsStream) - - val stockUrls = root.evaluateXPath("//div[@id='quotesearch']//li//a[@href]") - - val elements = root.getElementsByName("a", true) - - val hrefs = (0 until stockUrls.length) - .map(stockUrls(_).asInstanceOf[TagNode].getAttributeByName("href")) - .map { url => - url match { - case urlPattern(code) => code - case _ => null - } - }.toArray - hrefs - } -} - -object StockMarket { - - class ServiceHour(all: Boolean) extends Serializable { - - /** - * Morning openning: 9:30 am - 11:30 am - */ - val morningStart = GMT8(new DateTime(0, 1, 1, 9, 30)).getMillis - val morningEnd = GMT8(new DateTime(0, 1, 1, 11, 30)).getMillis - - /** - * After noon openning: 13:00 pm - 15:00 pm - */ - val afternoonStart = GMT8(new DateTime(0, 1, 1, 13, 0)).getMillis - val afternoonEnd = GMT8(new DateTime(0, 1, 1, 15, 0)).getMillis - - def inService: Boolean = { - - if (all) { - true - } else { - val now = GMT8(DateTime.now()).withDate(0, 1, 1).getMillis - if (now >= morningStart && now <= morningEnd || - now >= afternoonStart && now <= afternoonEnd) { - true - } else { - false - } - } - } - - private def GMT8(time: DateTime): DateTime = { - time.withZone(DateTimeZone.UTC).plusHours(8) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala b/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala deleted file mode 100644 index 6d17c20..0000000 --- a/examples/streaming/stockcrawler/src/main/scala/org/apache/gearpump/streaming/examples/stock/main/Stock.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.examples.stock.main - -import akka.actor.ActorSystem -import org.slf4j.Logger - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import org.apache.gearpump.partitioner.HashPartitioner -import org.apache.gearpump.streaming.examples.stock.StockMarket.ServiceHour -import org.apache.gearpump.streaming.examples.stock.{Analyzer, Crawler, QueryServer, StockMarket} -import org.apache.gearpump.streaming.{Processor, StreamApplication} -import org.apache.gearpump.transport.HostPort -import org.apache.gearpump.util.Graph.Node -import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil} - -/** Tracks the China's stock market index change */ -object Stock extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "crawler" -> CLIOption[Int]("<how many fetcher to get data from remote>", - required = false, defaultValue = Some(10)), - "analyzer" -> CLIOption[Int]("<parallism of analyzer>", - required = false, defaultValue = Some(1)), - "proxy" -> CLIOption[String]("proxy setting host:port, for example: 127.0.0.1:8443", - required = false, defaultValue = Some(""))) - - def crawler(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { - val crawler = Processor[Crawler](config.getInt("crawler")) - val analyzer = Processor[Analyzer](config.getInt("analyzer")) - val queryServer = Processor[QueryServer](1) - - val proxySetting = config.getString("proxy") - val proxy = if (proxySetting.isEmpty) { - null - } else HostPort(proxySetting) - val stockMarket = new StockMarket(new ServiceHour(true), proxy) - val stocks = stockMarket.getStockIdList - - // scalastyle:off println - Console.println(s"Successfully fetched stock id for ${stocks.length} stocks") - // scalastyle:on println - - val userConfig = UserConfig.empty.withValue("StockId", stocks) - .withValue[StockMarket](classOf[StockMarket].getName, stockMarket) - val partitioner = new HashPartitioner - - val p1 = crawler ~ partitioner ~> analyzer - val p2 = Node(queryServer) - val graph = Graph(p1, p2) - val app = StreamApplication("stock_direct_analyzer", graph, userConfig - ) - app - } - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - val context = ClientContext(akkaConf) - - implicit val system = context.system - - val app = crawler(config) - val appId = context.submit(app) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/README.md b/examples/streaming/transport/README.md deleted file mode 100644 index fc9bdfe..0000000 --- a/examples/streaming/transport/README.md +++ /dev/null @@ -1,3 +0,0 @@ -What is this? -============= -A smart transportation example which simulate a city with millions of cars. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/geardefault.conf b/examples/streaming/transport/src/main/resources/geardefault.conf deleted file mode 100644 index 0c8f421..0000000 --- a/examples/streaming/transport/src/main/resources/geardefault.conf +++ /dev/null @@ -1,12 +0,0 @@ -gearpump { - - serializers { - ## Follow this format when adding new serializer for new message types - ## "org.apache.gearpump.Message" = "org.apache.gearpump.streaming.MessageSerializer" - "org.apache.gearpump.streaming.examples.transport.PassRecord" = "" - } -} - -spray.can { - server.parsing.max-content-length = "10M" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/body.png ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/css/body.png b/examples/streaming/transport/src/main/resources/transport/css/body.png deleted file mode 100644 index b5c536c..0000000 Binary files a/examples/streaming/transport/src/main/resources/transport/css/body.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/custom.css ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/css/custom.css b/examples/streaming/transport/src/main/resources/transport/css/custom.css deleted file mode 100644 index f324b6a..0000000 --- a/examples/streaming/transport/src/main/resources/transport/css/custom.css +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -.ui-datepicker { - font-size: 11px; -} - -.sidebar-label { - font-size: 15px; - font-family: calibri, Arial, Helvetica, sans-serif; -} - -.help { - font-size: 12px; - font-family: calibri, Arial, Helvetica, sans-serif; -} - -div.splitter { - margin: 12px 0px 7px 0px; - clear: both; - border-top: 1px solid #EBEBEB; -} - -input.sidebar { - width: 165px -} - -select.sidebar { - width: 198px -} - -table.dataintable { - font-family: calibri, Arial, Helvetica, sans-serif; - font-size: 15px; - margin-top: 10px; - border-collapse: collapse; - border: 1px solid #888; -} - -table.dataintable th { - vertical-align: baseline; - padding: 5px 15px 5px 5px; - background-color: #EEE; - border: 1px solid #888; - text-align: left; -} - -table.dataintable td { - vertical-align: text-top; - padding: 5px 15px 5px 5px; - background-color: #FFFFFF; - border: 1px solid #AAA; -} - -#search { - width: 100px; - height: 25px; - position: relative; - left: 0px; - top: 5px; -} - -#mytable { - width: 100%; - height: 300; - float: left; -} - -#mychart { - height: 400px; - width: 100%; -} - -#Menu { - height: 100%; - width: 245px; - float: left; -} - -#header { - height: 115px; - background-image: url(header.png); -} - -#body { - height: 100%; - width: 100%; - background-image: url(body.png); - background-size: 100% 100%; -} - -#footer { - color: white; - height: 70px; - line-height: 70px; - text-align: middle; - clear: both; - text-align: center; - background-image: url(foot.png); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/foot.png ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/css/foot.png b/examples/streaming/transport/src/main/resources/transport/css/foot.png deleted file mode 100644 index 5db91b5..0000000 Binary files a/examples/streaming/transport/src/main/resources/transport/css/foot.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/examples/streaming/transport/src/main/resources/transport/css/header.png ---------------------------------------------------------------------- diff --git a/examples/streaming/transport/src/main/resources/transport/css/header.png b/examples/streaming/transport/src/main/resources/transport/css/header.png deleted file mode 100644 index 9284e44..0000000 Binary files a/examples/streaming/transport/src/main/resources/transport/css/header.png and /dev/null differ
