[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-08-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-gearpump/pull/67


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-08-21 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r75613138
  
--- Diff: 
examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
 ---
@@ -31,14 +32,14 @@ class NumberGeneratorProcessor(taskContext: 
TaskContext, conf: UserConfig)
   private var num = 0L
   override def onStart(startTime: Instant): Unit = {
 num = startTime.toEpochMilli
-self ! Message("start")
+self ! Watermark(Instant.ofEpochMilli(num))
--- End diff --

self ! Watermark(startTime)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-28 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72598874
  
--- Diff: 
external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 ---
@@ -74,11 +74,10 @@ abstract class AbstractKafkaSource(
   private lazy val kafkaClient: KafkaClient = 
kafkaClientFactory.getKafkaClient(config)
   private lazy val fetchThread: FetchThread = 
fetchThreadFactory.getFetchThread(config, kafkaClient)
   private lazy val messageDecoder = config.getConfiguredInstance(
-KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder])
-  private lazy val timestampFilter = config.getConfiguredInstance(
--- End diff --

previously filter is used to filter out old messages (e.g. timestamp < 
startTime) and carried out in KafkaSource implicitly. Now I think it should be 
defined explicitly in the following operations by users like 
`withAllowedLateness` in Beam API although that is not available in Gearpump 
API yet. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-28 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72588764
  
--- Diff: 
external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 ---
@@ -92,6 +91,7 @@ abstract class AbstractKafkaSource(
 
 LOG.info("KafkaSource opened at start time {}", startTime)
 this.startTime = startTime
--- End diff --

good point


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-28 Thread whjiang
Github user whjiang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72590179
  
--- Diff: 
external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 ---
@@ -74,11 +74,10 @@ abstract class AbstractKafkaSource(
   private lazy val kafkaClient: KafkaClient = 
kafkaClientFactory.getKafkaClient(config)
   private lazy val fetchThread: FetchThread = 
fetchThreadFactory.getFetchThread(config, kafkaClient)
   private lazy val messageDecoder = config.getConfiguredInstance(
-KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder])
-  private lazy val timestampFilter = config.getConfiguredInstance(
--- End diff --

why filter is needed previously and now unneeded?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-28 Thread whjiang
Github user whjiang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72589692
  
--- Diff: 
examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 ---
@@ -86,4 +90,30 @@ object KafkaReadWrite extends AkkaApp with 
ArgumentsParser {
 val appId = context.submit(application(config, context.system))
 context.close()
   }
+
+
+  class EventTimeKafkaMessageDecoder extends KafkaMessageDecoder {
+private var count = 0
+private var localMin = Long.MaxValue
+private var watermark = 0L
+private val LOG = 
LogUtil.getLogger(classOf[EventTimeKafkaMessageDecoder])
+/**
+ * @param key key of a kafka message, can be NULL
+ * @param value value of a kafka message
+ * @return a gearpump Message
+ */
+override def fromBytes(key: Array[Byte], value: Array[Byte]): 
MessageAndWatermark = {
+  Injection.invert[Long, Array[Byte]](key).map { eventTime =>
+if (count == 10) {
+  watermark = localMin
--- End diff --

shall we consider some delay on watermark? E.g. if Source get messages at 
10:00, shall we report that the watermark is 9:55 instead of 10:00 as data is 
out of order and may be delayed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-27 Thread manuzhang
Github user manuzhang commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72433772
  
--- Diff: 
examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 ---
@@ -86,4 +90,30 @@ object KafkaReadWrite extends AkkaApp with 
ArgumentsParser {
 val appId = context.submit(application(config, context.system))
 context.close()
   }
+
+
+  class EventTimeKafkaMessageDecoder extends KafkaMessageDecoder {
+private var count = 0
+private var localMin = Long.MaxValue
+private var watermark = 0L
+private val LOG = 
LogUtil.getLogger(classOf[EventTimeKafkaMessageDecoder])
+/**
+ * @param key key of a kafka message, can be NULL
+ * @param value value of a kafka message
+ * @return a gearpump Message
+ */
+override def fromBytes(key: Array[Byte], value: Array[Byte]): 
MessageAndWatermark = {
+  Injection.invert[Long, Array[Byte]](key).map { eventTime =>
+if (count == 10) {
--- End diff --

this example is for testing and will be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-27 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72413816
  
--- Diff: 
external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 ---
@@ -92,6 +91,7 @@ abstract class AbstractKafkaSource(
 
 LOG.info("KafkaSource opened at start time {}", startTime)
 this.startTime = startTime
--- End diff --

Is startTime still needed if change `maybeRecover()` to 
`maybeRecover(startTime)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-27 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72412991
  
--- Diff: 
examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 ---
@@ -86,4 +90,30 @@ object KafkaReadWrite extends AkkaApp with 
ArgumentsParser {
 val appId = context.submit(application(config, context.system))
 context.close()
   }
+
+
+  class EventTimeKafkaMessageDecoder extends KafkaMessageDecoder {
+private var count = 0
+private var localMin = Long.MaxValue
+private var watermark = 0L
+private val LOG = 
LogUtil.getLogger(classOf[EventTimeKafkaMessageDecoder])
+/**
+ * @param key key of a kafka message, can be NULL
+ * @param value value of a kafka message
+ * @return a gearpump Message
+ */
+override def fromBytes(key: Array[Byte], value: Array[Byte]): 
MessageAndWatermark = {
+  Injection.invert[Long, Array[Byte]](key).map { eventTime =>
+if (count == 10) {
--- End diff --

count % 10 == 0 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-27 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72413298
  
--- Diff: 
examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaWriter.scala
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka
+
+import java.util.{Random, Properties}
+
+import akka.actor.ActorSystem
+import com.twitter.bijection.Injection
+import org.apache.gearpump.{TimeStamp, Message}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ParseResult, CLIOption, 
ArgumentsParser}
+import org.apache.gearpump.partitioner.ShufflePartitioner
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.kafka.KafkaSink
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.sink.DataSinkProcessor
+import org.apache.gearpump.streaming.source.{DataSource, 
DataSourceProcessor}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.{Graph, LogUtil, AkkaApp}
+import org.apache.gearpump.util.Graph._
+import org.slf4j.Logger
+
+object KafkaWriter extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+"sink" -> CLIOption[Int]("", required 
= false,
+  defaultValue = Some(1)),
+"brokerList" -> CLIOption[String]("", 
required = false,
+  defaultValue = Some("localhost:9092")),
+"sinkTopic" -> CLIOption[String]("", required = 
false,
+  defaultValue = Some("topic2"))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): 
StreamApplication = {
+implicit val actorSystem = system
+val appName = "KafkaWriter"
+val sinkNum = config.getInt("sink")
+val brokerList = config.getString("brokerList")
+val sinkTopic = config.getString("sinkTopic")
+
+val source = new RandomSource
+val sourceProcessor = DataSourceProcessor(source, 1)
+val appConfig = UserConfig.empty
+val props = new Properties
+props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+val sink = new KafkaSink(sinkTopic, props)
+val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+val partitioner = new ShufflePartitioner
+val graph = sourceProcessor ~ partitioner ~> sinkProcessor
+val app = StreamApplication(appName, Graph(graph), appConfig)
+app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+val config = parse(args)
+val context = ClientContext(akkaConf)
+val appId = context.submit(application(config, context.system))
+context.close()
+  }
+
+  class RandomSource extends DataSource {
+
+private var count = 0
+private var lowerBound = 0
+private val step = 10
+private val random = new Random
+
+override def open(context: TaskContext, startTime: TimeStamp): Unit = 
{}
+
+override def close(): Unit = {}
+
+override def read(): Message = {
+  // event times are not in order among every 10 messages
+  val number = random.nextInt(10) + lowerBound
+  val msg = Message(
+Injection[Long, Array[Byte]](number) -> Injection[Long, 
Array[Byte]](number))
+  count += 1
+  if (count == step) {
--- End diff --

count % step == 0 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-27 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/67#discussion_r72412383
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 ---
@@ -46,23 +46,39 @@ class DataSourceTask private[source](context: 
TaskContext, conf: UserConfig, sou
   }
   private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
   private var startTime = 0L
+  private var lastWatermark: TimeStamp = startTime
 
   override def onStart(newStartTime: StartTime): Unit = {
 startTime = newStartTime.startTime
 LOG.info(s"opening data source at $startTime")
 source.open(context, startTime)
-self ! Message("start", System.currentTimeMillis())
+self ! Message("start")
   }
 
   override def onNext(message: Message): Unit = {
 0.until(batchSize).foreach { _ =>
-  Option(source.read()).foreach(context.output)
+  Option(source.read()).foreach { msg =>
+context.output(msg)
+  }
 }
-self ! Message("continue", System.currentTimeMillis())
+
+maybeUpdateWatermark()
+self ! Message("continue")
   }
 
   override def onStop(): Unit = {
 LOG.info("closing data source...")
 source.close()
   }
+
+  private def maybeUpdateWatermark(): Unit = {
+val curWatermark = source.getWatermark
+if (curWatermark > lastWatermark) {
+  lastWatermark = curWatermark
+  self ! UpstreamMinClock(curWatermark)
--- End diff --

Once the watermark move forward, the Actor will send an UpstreamMinClock to 
self. If DataSource's watermark updates frequently, will it be a problem? I 
mean the UpstreamMinClock flood.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #67: fix GEARPUMP-32, introduce source water...

2016-07-27 Thread manuzhang
GitHub user manuzhang opened a pull request:

https://github.com/apache/incubator-gearpump/pull/67

fix GEARPUMP-32, introduce source watermark

This is for early review and contains some example codes which will be 
removed before merge. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/incubator-gearpump watermark

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/67.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #67






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---