Repository: spark Updated Branches: refs/heads/master 751724b13 -> fb8bb0476
[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate. Author: Lin Zhao <l...@exabeam.com> Closes #11176 from lin-zhao/SPARK-13069. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb8bb047 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb8bb047 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb8bb047 Branch: refs/heads/master Commit: fb8bb04766005e8935607069c0155d639f407e8a Parents: 751724b Author: Lin Zhao <l...@exabeam.com> Authored: Thu Feb 25 12:32:17 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Thu Feb 25 12:32:24 2016 -0800 ---------------------------------------------------------------------- .../spark/streaming/akka/ActorReceiver.scala | 39 +++++++++++++++++++- .../streaming/akka/JavaAkkaUtilsSuite.java | 2 + .../spark/streaming/akka/AkkaUtilsSuite.scala | 3 ++ 3 files changed, 43 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala ---------------------------------------------------------------------- diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index c75dc92..33415c1 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import akka.pattern.ask +import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.apache.spark.{Logging, TaskContext} @@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor { } /** - * Store a single item of received data to Spark's memory. + * Store a single item of received data to Spark's memory asynchronously. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. */ def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor { def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int, /** Case class to receive data sent by child actors */ private[akka] sealed trait ActorReceiverData private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] object Ack extends ActorReceiverData /** * Provides Actors as receivers for receiving stream. @@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag]( store(msg.asInstanceOf[T]) n.incrementAndGet + case AskStoreSingleItemData(msg) => + logDebug("received single sync") + store(msg.asInstanceOf[T]) + n.incrementAndGet + sender() ! Ack + case ByteBufferData(bytes) => logDebug("received bytes") store(bytes) http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java ---------------------------------------------------------------------- diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java index b732506..ac5ef31 100644 --- a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java +++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.util.Timeout; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Test; @@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver { @Override public void onReceive(Object message) throws Exception { store((String) message); + store((String) message, new Timeout(1000)); } } http://git-wip-us.apache.org/repos/asf/spark/blob/fb8bb047/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala index f437585..ce95d9d 100644 --- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.akka +import scala.concurrent.duration._ + import akka.actor.{Props, SupervisorStrategy} import org.apache.spark.SparkFunSuite @@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite { class TestActor extends ActorReceiver { override def receive: Receive = { case m: String => store(m) + case m => store(m, 10.seconds) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org