Hi all,
Is there a "best practice" for subscribing to JMS with Spark Streaming? I
have searched but not found anything conclusive.
In the absence of a standard practice the solution I was thinking of was to
use Akka + Camel (akka.camel.Consumer) to create a subscription for a Spark
Streaming Custom Receiver. So the actor would look something like this:
class JmsReceiver(jmsURI: String) extends NetworkReceiver[String] with
Consumer {
//e.g. "jms:sonicmq://localhost:2506/queue?destination=SampleQ1"
def endpointUri = jmsURI
lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
protected override def onStart() {
blockGenerator.start
}
def receive = {
case msg: CamelMessage => { blockGenerator += msg.body }
case _ => { /* ... */ }
}
protected override def onStop() {
blockGenerator.stop
}
}
And then in the main application create receivers like this:
val ssc = new StreamingContext(...)
object tascQueue extends JmsReceiver[String](ssc) {
override def getReceiver():JmsReceiver[String] = {
new JmsReceiver("jms:sonicmq://localhost:2506/queue?destination=TascQueue")
}
}
ssc.registerInputStream(tascQueue)
Is this the best way to go?
Best regards,
Patrick