Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 5f90b70f9 -> 2913a1fd8


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
new file mode 100644
index 0000000..1b9c4e3
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SinkBridgeTask.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.util.Timeout
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.SinkBridgeTask.RequestMessage
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
+import org.apache.gearpump.util.LogUtil
+import org.reactivestreams.{Publisher, Subscriber, Subscription}
+
+/**
+ * Bridge Task when data flow is from remote Gearpump Task to local 
Akka-Stream Module
+ *
+ *
+ * upstream [[Task]] -> [[SinkBridgeTask]]
+ *                         \              Remote Cluster
+ * -------------------------\----------------------
+ *                           \            Local JVM
+ *                            \|
+ *                       Akka Stream [[Subscriber]]
+ *
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
+ */
+class SinkBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+  extends Task(taskContext, userConf) {
+  import taskContext.taskId
+
+  val queue = new util.LinkedList[Message]()
+  var subscriber: ActorRef = _
+
+  var request: Int = 0
+
+  override def onStart(startTime : Instant) : Unit = {}
+
+  override def onNext(msg : Message) : Unit = {
+    queue.add(msg)
+    trySendingData()
+  }
+
+  override def onStop() : Unit = {}
+
+  private def trySendingData(): Unit = {
+    if (subscriber != null) {
+      (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
+        subscriber ! msg.msg
+        request -= 1
+      }
+    }
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case RequestMessage(n) =>
+      this.subscriber = sender
+      LOG.info("the downstream has requested " + n + " messages from " + 
subscriber)
+      request += n.toInt
+      trySendingData()
+    case msg =>
+      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
+  }
+}
+
+object SinkBridgeTask {
+
+  case class RequestMessage(number: Int)
+
+  class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, 
appId: Int,
+      processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
+    private val taskId = TaskId(processorId, index = 0)
+    private val LOG = LogUtil.getLogger(getClass)
+
+    private var actor: ActorRef = _
+    import system.dispatcher
+
+    private val task =
+      context.askAppMaster[TaskActorRef](appId, 
LookupTaskActorRef(taskId)).map{container =>
+      // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
+      container.task
+    }
+
+    override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = {
+      this.actor = system.actorOf(Props(new ClientActor(subscriber)))
+      subscriber.onSubscribe(this)
+    }
+
+    override def cancel(): Unit = Unit
+
+    private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
+    override def request(l: Long): Unit = {
+      task.foreach{ task =>
+        task.tell(RequestMessage(l.toInt), actor)
+      }
+    }
+  }
+
+  class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
+    def receive: Receive = {
+      case result: AnyRef =>
+        subscriber.onNext(result)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
new file mode 100644
index 0000000..054b483
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SourceBridgeTask.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import akka.actor.Actor.Receive
+import org.apache.gearpump.Message
+import 
org.apache.gearpump.akkastream.task.SourceBridgeTask.{AkkaStreamMessage, 
Complete, Error}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, 
TaskActorRef}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
+import org.reactivestreams.{Subscriber, Subscription}
+
+import scala.concurrent.ExecutionContext
+
+/**
+ * Bridge Task when data flow is from local Akka-Stream Module to remote 
Gearpump Task
+ *
+ *
+ *
+ *      [[SourceBridgeTask]]   --> downstream [[Task]]
+ *                 /|                Remote Cluster
+ * ---------------/--------------------------------
+ *               /                    Local JVM
+ *    Akka Stream [[org.reactivestreams.Publisher]]
+ *
+ *
+ * @param taskContext TaskContext
+ * @param userConf UserConfig
+ */
+class SourceBridgeTask(taskContext : TaskContext, userConf : UserConfig)
+  extends Task(taskContext, userConf) {
+  import taskContext.taskId
+
+  override def onStart(startTime : Instant) : Unit = {}
+
+  override def onNext(msg : Message) : Unit = {
+    LOG.info("AkkaStreamSource receiving message " + msg)
+  }
+
+  override def onStop() : Unit = {}
+
+  override def receiveUnManagedMessage: Receive = {
+    case Error(ex) =>
+      LOG.error("the stream has error", ex)
+    case AkkaStreamMessage(msg) =>
+      LOG.info("we have received message from akka stream source: " + msg)
+      taskContext.output(Message(msg, System.currentTimeMillis()))
+    case Complete(description) =>
+      LOG.info("the stream is completed: " + description)
+    case msg =>
+      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
+  }
+}
+
+
+object SourceBridgeTask {
+  case class Error(ex: java.lang.Throwable)
+
+  case class Complete(description: String)
+
+  case class AkkaStreamMessage[T >: AnyRef](msg: T)
+
+  class SourceBridgeTaskClient[T >: AnyRef](ec: ExecutionContext,
+      context: ClientContext, appId: Int, processorId: ProcessorId) extends 
Subscriber[T] {
+    val taskId = TaskId(processorId, 0)
+    var subscription: Subscription = _
+    implicit val dispatcher = ec
+
+    val task = context.askAppMaster[TaskActorRef](appId,
+      LookupTaskActorRef(taskId)).map{container =>
+      // println("Successfully resolved taskRef for taskId " + taskId + ", " + 
container.task)
+      container.task
+    }
+
+    override def onError(throwable: Throwable): Unit = {
+      task.map(task => task ! Error(throwable))
+    }
+
+    override def onSubscribe(subscription: Subscription): Unit = {
+      // when taskActorRef is resolved, request message from upstream
+      this.subscription = subscription
+      task.map(task => subscription.request(1))
+    }
+
+    override def onComplete(): Unit = {
+      task.map(task => task ! Complete("the upstream is completed"))
+    }
+
+    override def onNext(t: T): Unit = {
+      task.map {task =>
+        task ! AkkaStreamMessage(t)
+      }
+      subscription.request(1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
new file mode 100644
index 0000000..a0674bc
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/StatefulMapConcatTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class StatefulMapConcatTask[IN, OUT](context: TaskContext, userConf : 
UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val func = userConf.getValue[() => IN => 
Iterable[OUT]](StatefulMapConcatTask.FUNC).get
+  var f: IN => Iterable[OUT] = _
+
+  override def onStart(startTime: Instant) : Unit = {
+    f = func()
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    val in: IN = msg.msg.asInstanceOf[IN]
+    val out: Iterable[OUT] = f(in)
+    val iterator = out.iterator
+    while(iterator.hasNext) {
+      val nextValue = iterator.next
+      context.output(Message(nextValue, System.currentTimeMillis()))
+    }
+  }
+}
+
+object StatefulMapConcatTask {
+  val FUNC = "FUNC"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
new file mode 100644
index 0000000..9559d8f
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TakeWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object TakeWithinTimeout
+
+class TakeWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val timeout = userConf.getValue[FiniteDuration](TakeWithinTask.TIMEOUT).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var timeoutActive = false
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(timeout)(
+      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DropWithinTimeout =>
+        timeoutActive = true
+      case _ =>
+
+    }
+    timeoutActive match {
+      case true =>
+      case false =>
+        context.output(msg)
+    }
+  }
+}
+
+object TakeWithinTask {
+  val TIMEOUT = "TIMEOUT"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
new file mode 100644
index 0000000..3c7ad87
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ThrottleTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class ThrottleTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val cost = userConf.getInt(ThrottleTask.COST).getOrElse(0)
+  val costCalc = userConf.getValue[T => Int](ThrottleTask.COST_CALC)
+  val maxBurst = userConf.getInt(ThrottleTask.MAX_BURST)
+  val timePeriod = userConf.getValue[FiniteDuration](ThrottleTask.TIME_PERIOD).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  val interval = timePeriod.toNanos / cost
+
+  // TODO control rate from TaskActor
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[T]
+    val time = msg.timestamp
+    context.output(msg)
+  }
+}
+
+object ThrottleTask {
+  val COST = "COST"
+  val COST_CALC = "COST_CAL"
+  val MAX_BURST = "MAX_BURST"
+  val TIME_PERIOD = "TIME_PERIOD"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
new file mode 100644
index 0000000..d99d2db
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/TickSourceTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+class TickSourceTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val initialDelay = 
userConf.getValue[FiniteDuration](TickSourceTask.INITIAL_DELAY).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  (TickSourceTask.INITIAL_DELAY)
+  val interval = userConf.getValue[FiniteDuration](TickSourceTask.INTERVAL).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  val tick = userConf.getValue[T](TickSourceTask.TICK).get
+
+  override def onStart(startTime: Instant): Unit = {
+    context.schedule(initialDelay, interval)(
+      self ! Message(tick, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}
+
+object TickSourceTask {
+  val INITIAL_DELAY = "INITIAL_DELAY"
+  val INTERVAL = "INTERVAL"
+  val TICK = "TICK"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
new file mode 100644
index 0000000..005d018
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Unzip2Task.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.akkastream.task.Unzip2Task.UnZipFunction
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Unzip2Task[In, A1, A2](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val unzip = userConf.
+    getValue[UnZipFunction[In, A1, 
A2]](Unzip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
+
+  override def onNext(msg : Message) : Unit = {
+    val message = msg.msg
+    val time = msg.timestamp
+    val pair = unzip(message.asInstanceOf[In])
+    val (a, b) = pair
+    output(0, Message(a.asInstanceOf[AnyRef], time))
+    output(1, Message(b.asInstanceOf[AnyRef], time))
+  }
+}
+
+object Unzip2Task {
+  case class UnZipFunction[In, A1, A2](unzip: In => (A1, A2)) extends 
Serializable
+
+  val UNZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.unzip2.function"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
new file mode 100644
index 0000000..7e0c082
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/Zip2Task.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.Zip2Task.ZipFunction
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class Zip2Task[A1, A2, OUT](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val zip = userConf.
+    getValue[ZipFunction[A1, A2, 
OUT]](Zip2Task.ZIP2_FUNCTION)(context.system).get.zip
+  var a1: Option[A1] = None
+  var a2: Option[A2] = None
+
+  override def onNext(msg : Message) : Unit = {
+    val message = msg.msg
+    val time = msg.timestamp
+    a1 match {
+      case Some(x) =>
+        a2 = Some(message.asInstanceOf[A2])
+        a1.foreach(v1 => {
+          a2.foreach(v2 => {
+            val out = zip(v1, v2)
+            context.output(Message(out.asInstanceOf[OUT], time))
+
+          })
+        })
+      case None =>
+        a1 = Some(message.asInstanceOf[A1])
+    }
+  }
+}
+
+object Zip2Task {
+  case class ZipFunction[A1, A2, OUT](val zip: (A1, A2) => OUT) extends 
Serializable
+
+  val ZIP2_FUNCTION = "org.apache.gearpump.akkastream.task.zip2.function"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
new file mode 100644
index 0000000..6ad90df
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/util/MaterializedValueOps.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.util
+
+import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, 
MaterializedValueNode, Module, Transform}
+
+class MaterializedValueOps(mat: MaterializedValueNode) {
+  def resolve[Mat](materializedValues: scala.collection.mutable.Map[Module, 
Any]): Mat = {
+    def resolveMaterialized(mat: MaterializedValueNode,
+        materializedValues: scala.collection.mutable.Map[Module, Any]): Any = 
mat match {
+      case Atomic(m) => materializedValues.getOrElse(m, ())
+      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues),
+        resolveMaterialized(d2, materializedValues))
+      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
+      case Ignore => ()
+    }
+    resolveMaterialized(mat, materializedValues).asInstanceOf[Mat]
+  }
+}
+
+object MaterializedValueOps{
+  def apply(mat: MaterializedValueNode): MaterializedValueOps = new 
MaterializedValueOps(mat)
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
 
b/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
deleted file mode 100644
index 4ead839..0000000
--- 
a/experiments/akkastream/src/test/scala/akka/stream/gearpump/AttributesSpec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump
-
-import akka.stream.Attributes
-import org.scalatest.{FlatSpec, Matchers}
-
-class AttributesSpec extends FlatSpec with Matchers {
-  it should "merge the attributes together" in {
-    val a = Attributes.name("aa")
-    val b = Attributes.name("bb")
-
-    val c = a and b
-
-    assert("aa-bb" == c.nameOrDefault())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
 
b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
new file mode 100644
index 0000000..e1846ea
--- /dev/null
+++ 
b/experiments/akkastream/src/test/scala/org/apache/gearpump/akkastream/AttributesSpec.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import akka.stream.Attributes
+import org.scalatest.{FlatSpec, Matchers}
+
+class AttributesSpec extends FlatSpec with Matchers {
+  it should "merge the attributes together" in {
+    val a = Attributes.name("aa")
+    val b = Attributes.name("bb")
+
+    val c = a and b
+
+    assert("aa-bb" == c.nameOrDefault())
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/project/BuildDashboard.scala
----------------------------------------------------------------------
diff --git a/project/BuildDashboard.scala b/project/BuildDashboard.scala
index c14b9d6..cfa6aae 100644
--- a/project/BuildDashboard.scala
+++ b/project/BuildDashboard.scala
@@ -46,11 +46,11 @@ object BuildDashboard extends sbt.Build {
 
   private lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
     libraryDependencies ++= Seq(
-      "com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % "test",
+      "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % "test",
       "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
       "com.lihaoyi" %% "upickle" % upickleVersion,
-      "com.softwaremill.akka-http-session" %% "core" % "0.2.5",
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+      "com.softwaremill.akka-http-session" %% "core" % "0.3.0",
+      "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
       "com.github.scribejava" % "scribejava-apis" % "2.4.0",
       "com.ning" % "async-http-client" % "1.9.33",
       "org.webjars" % "angularjs" % "1.4.9",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/project/BuildExperiments.scala
----------------------------------------------------------------------
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index e07b688..eb5f9e1 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -25,7 +25,7 @@ import sbt.Keys._
 object BuildExperiments extends sbt.Build {
 
   lazy val experiments: Seq[ProjectReference] = Seq(
-    // akkastream,
+    akkastream,
     cgroup,
     redis,
     storm,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6949497..4e30d3f 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -23,7 +23,8 @@ object Dependencies {
 
   val crossScalaVersionNumbers = Seq("2.11.8")
   val scalaVersionNumber = crossScalaVersionNumbers.last
-  val akkaVersion = "2.4.3"
+  val akkaVersion = "2.4.16"
+  val akkaHttpVersion = "10.0.1"
   val hadoopVersion = "2.6.0"
   val hbaseVersion = "1.0.0"
   val commonsHttpVersion = "3.1"
@@ -82,10 +83,9 @@ object Dependencies {
       "com.typesafe.akka" %% "akka-agent" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
       "com.typesafe.akka" %% "akka-kernel" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
+      "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
       "org.scala-lang" % "scala-reflect" % scalaVersionNumber,
-      "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4",
       "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion,
       "com.google.guava" % "guava" % guavaVersion,
       "com.codahale.metrics" % "metrics-graphite" % codahaleVersion

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
 
b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
index 3088a39..53ee692 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -149,7 +149,7 @@ class AppMasterService(val master: ActorRef,
             }
           }
       } ~
-      path("metrics" / RestPath) { path =>
+      path("metrics" / RemainingPath) { path =>
         parameterMap { optionMap =>
           parameter("aggregator" ? "") { aggregator =>
             parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index ed15121..be96577 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -102,7 +102,7 @@ class MasterService(val master: ActorRef,
           failWith(ex)
       }
     } ~
-    path("metrics" / RestPath) { path =>
+    path("metrics" / RemainingPath) { path =>
       parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { 
readOption: String =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)
         onComplete(askActor[HistoryMetrics](master, query)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
 
b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
index 804b34f..4989364 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
@@ -60,8 +60,8 @@ import org.apache.gearpump.services.util.UpickleUtil._
 class SecurityService(inner: RouteService, implicit val system: ActorSystem) 
extends RouteService {
 
   // Use scheme "GearpumpBasic" to avoid popping up web browser native 
authentication box.
-  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = 
"gearpump",
-    params = Map.empty)
+  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = 
Some("gearpump"),
+    params = Map.empty[String, String])
 
   val LOG = LogUtil.getLogger(getClass, "AUDIT")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
index 284d3f2..7b33987 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -19,10 +19,12 @@
 package org.apache.gearpump.services
 
 import akka.actor.ActorSystem
+import akka.http.scaladsl.marshalling.ToResponseMarshallable
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.marshalling.ToResponseMarshallable._
+import akka.http.scaladsl.server.{RejectionHandler, StandardRoute}
 import akka.stream.Materializer
-
 import org.apache.gearpump.util.Util
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -56,14 +58,14 @@ class StaticService(override val system: ActorSystem, 
supervisorPath: String)
       getFromResource("index.html")
     } ~
     path("favicon.ico") {
-      complete(StatusCodes.NotFound)
+      complete(ToResponseMarshallable(StatusCodes.NotFound))
     } ~
     pathPrefix("webjars") {
       get {
         getFromResourceDirectory("META-INF/resources/webjars")
       }
     } ~
-    path(Rest) { path =>
+    path(Remaining) { path =>
       getFromResource("%s" format path)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala 
b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
index 8268d61..954fe97 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
@@ -63,7 +63,7 @@ class WorkerService(val master: ActorRef, override val 
system: ActorSystem)
           failWith(ex)
       }
     } ~
-    path("metrics" / RestPath ) { path =>
+    path("metrics" / RemainingPath ) { path =>
       val workerId = WorkerId.parse(workerIdString)
       parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
         val query = QueryHistoryMetrics(path.head.toString, readOption)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index ca8d89e..d4b3719 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
  */
 class StreamApplication(
     override val name: String, val inputUserConfig: UserConfig,
-    dag: Graph[ProcessorDescription, PartitionerDescription])
+    val dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 82ea7c7..5aaf2fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -124,12 +124,11 @@ case class DataSinkOp(
  * to another Op to be used
  */
 case class ChainableOp[IN, OUT](
-    fn: SingleInputFunction[IN, OUT]) extends Op {
+    fn: SingleInputFunction[IN, OUT],
+    userConfig: UserConfig = UserConfig.empty) extends Op {
 
   override def description: String = fn.description
 
-  override def userConfig: UserConfig = UserConfig.empty
-
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
       case op: ChainableOp[OUT, _] =>


Reply via email to