Receive on driver program (without serializing)

2015-03-30 Thread MartijnD
We are building a wrapper that makes it possible to use reactive streams
(i.e. Observable, see reactivex.io) as input to Spark Streaming. We
therefore tried to create a custom receiver for Spark. However, the
Observable lives at the driver program and is generally not serializable.

Is it possible to create a receiver that runs next to the driver program
and therefore does not need to be serialized?

---

We tried the following, which gives a `NotSerializableException`:



object Main {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(local[2]).setAppName(Clock)
val ssc = new StreamingContext(conf, Seconds(1))

val data = // A non-serializable stream of data
val stream = RxcreateStream(ssc, data)

// Mapping, filtering, etc

ssc.start()
ssc.awaitTermination()
  }
}

with the createStream method something like the following:

object RxUtils {
  def createStream[T: ClassTag](scc_ : StreamingContext, observable:
Observable[T]): ReceiverInputDStream[T] = {
new RxInputDStream[T](scc_, observable,
StorageLevel.MEMORY_AND_DISK_SER_2)
  }
}

class RxInputDStream[T: ClassTag](ssc_ : StreamingContext, observable:
Observable[T], storageLevel: StorageLevel) extends
ReceiverInputDStream[T](ssc_)  {
  override def getReceiver(): Receiver[T] = {
new RxReceiver(observable, storageLevel)
  }
}

class RxReceiver[T](observable: Observable[T], storageLevel:
StorageLevel) extends Receiver[T](storageLevel) with Logging {
  var subscription: Option[Subscription] = None

  override def onStart(): Unit = {
// NOTE: 'observable' is a reference to a variable in the driver
program
subscription = Some(
  observable
.asInstanceOf[Observable[T]]
.subscribe(x = store(x))
)
  }
}

the comment indicates what causes the `NotSerializableException`.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Receive-on-driver-program-without-serializing-tp22291.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming custom receiver for local data

2015-03-19 Thread MartijnD
We are building a wrapper that makes it possible to use reactive streams
(i.e. Observable, see reactivex.io) as input to Spark Streaming. We
therefore tried to create a custom receiver for Spark. However, the
Observable lives at the driver program and is generally not serializable.

Is it possible to create a receiver that runs next to the driver program and
therefore does not need to be serialized?

We tried the following, which gives a NotSerializableException:

object Main {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(local[2]).setAppName(Clock)
val ssc = new StreamingContext(conf, Seconds(1))

val data = // A non-serializable stream of data
val stream = RxcreateStream(ssc, data)

// Mapping, filtering, etc

ssc.start()
ssc.awaitTermination()
  }
}

with the createStream method something like the following:

object RxUtils {
  def createStream[T: ClassTag](scc_ : StreamingContext, observable:
Observable[T]): ReceiverInputDStream[T] = {
new RxInputDStream[T](scc_, observable,
StorageLevel.MEMORY_AND_DISK_SER_2)
  }
}

class RxInputDStream[T: ClassTag](ssc_ : StreamingContext, observable:
Observable[T], storageLevel: StorageLevel) extends
ReceiverInputDStream[T](ssc_)  {
  override def getReceiver(): Receiver[T] = {
new RxReceiver(observable, storageLevel)
  }
}

class RxReceiver[T](observable: Observable[T], storageLevel: StorageLevel)
extends Receiver[T](storageLevel) with Logging {
  var subscription: Option[Subscription] = None

  override def onStart(): Unit = {
// NOTE: 'observable' is a reference to a variable in the driver program
subscription = Some(
  observable
.asInstanceOf[Observable[T]]
.subscribe(x = store(x))
)
  }
}

the comment indicates what causes the NotSerializableException.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-custom-receiver-for-local-data-tp22148.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org