[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever
Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate. Author: Lin Zhao <l...@exabeam.com> Closes #11176 from lin-zhao/SPARK-13069. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/6ac5a189 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/6ac5a189 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/6ac5a189 Branch: refs/heads/master Commit: 6ac5a1893dbab0f44c2723611aa2f1e6cac82a7c Parents: 5d45050 Author: Lin Zhao <l...@exabeam.com> Authored: Thu Feb 25 12:32:17 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Feb 25 12:32:24 2016 -0800 ---------------------------------------------------------------------- .../spark/streaming/akka/ActorReceiver.scala | 39 +++++++++++++++++++- .../streaming/akka/JavaAkkaUtilsSuite.java | 2 + .../spark/streaming/akka/AkkaUtilsSuite.scala | 3 ++ 3 files changed, 43 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/6ac5a189/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index c75dc92..33415c1 100644 --- a/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/streaming-akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import akka.pattern.ask +import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.apache.spark.{Logging, TaskContext} @@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor { } /** - * Store a single item of received data to Spark's memory. + * Store a single item of received data to Spark's memory asynchronously. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. */ def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor { def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int, /** Case class to receive data sent by child actors */ private[akka] sealed trait ActorReceiverData private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] object Ack extends ActorReceiverData /** * Provides Actors as receivers for receiving stream. @@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag]( store(msg.asInstanceOf[T]) n.incrementAndGet + case AskStoreSingleItemData(msg) => + logDebug("received single sync") + store(msg.asInstanceOf[T]) + n.incrementAndGet + sender() ! Ack + case ByteBufferData(bytes) => logDebug("received bytes") store(bytes) http://git-wip-us.apache.org/repos/asf/bahir/blob/6ac5a189/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java ---------------------------------------------------------------------- diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java index b732506..ac5ef31 100644 --- a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java +++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.util.Timeout; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Test; @@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver { @Override public void onReceive(Object message) throws Exception { store((String) message); + store((String) message, new Timeout(1000)); } } http://git-wip-us.apache.org/repos/asf/bahir/blob/6ac5a189/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala index f437585..ce95d9d 100644 --- a/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala +++ b/streaming-akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.akka +import scala.concurrent.duration._ + import akka.actor.{Props, SupervisorStrategy} import org.apache.spark.SparkFunSuite @@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite { class TestActor extends ActorReceiver { override def receive: Receive = { case m: String => store(m) + case m => store(m, 10.seconds) } }