Problem statement: I am building a somewhat time-critical application that is supposed to receive messages on a stream(ZMQ) and then operate on each of the data points that comes in on the stream. The caveat is that some data points may need more time for processing than most others since the processing involves another network i/o (a remote fetch operation like an HTTP GET from various remote servers / remote sources . And with timeouts of course). In this scenario, I want the processing of the remaining data points to continue without blocking for the data from the slow-processing data point. Also, i want the next batch of data points out of the stream to start being processed while we are still waiting for the previous batch's slow-processing data point.
Progress so far: To test out the concept, I tried the following: Created a test stream that generates a random number every 500 ms. During processing of the Dstream out of this, select a string "3" and purposely delay the processing for it and observe the behavior. // Just a test Stream that generates random number Strings . class testInputDStream ( @transient ssc_ : StreamingContext, storageLevel: StorageLevel ) extends ReceiverInputDStream[String](ssc_) with Logging { def getReceiver(): Receiver[String] = { new testReceiver(storageLevel) } } object testUtils { def createStream( ssc: StreamingContext, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { new testInputDStream(ssc, storageLevel) } } class testReceiver (storageLevel: StorageLevel) extends Receiver[String](storageLevel) with Logging { def onStop() { } def onStart() { // Start the thread that receives data over a connection new Thread("Test Receiver") { override def run() { receive() } }.start() } /** Create a socket connection and receive data until receiver is stopped */ private def receive(): Unit = { println("Starting test receiver...") val num = scala.util.Random try { while(true) { val newnum = num.nextInt(10) store(newnum.toString()) Thread.sleep(100) } } catch { case e: Exception => println(e) } } } object test_streaming { private def gofetch(x :String): List[Any] = { if (x == "3") { println("Slow Processing") Thread.sleep(2000) } return List(x, x, x) } def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) // Prepare our context and stream val conf = new SparkConf() .setMaster("local[*]") .setAppName("test_streaming") // Initialization val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Milliseconds(500)) val rstream = testUtils.createStream(ssc, StorageLevel.MEMORY_ONLY) val ourdata = rstream.map( x => gofetch(x)) ourdata.count().print() ssc.start() ssc.awaitTermination() // Wait for the computation to terminate*/ } } However, I see that the slow processing Data point blocks other data points. Is there a way of achieving what I am trying to do? I would appreciate feedbacks / inputs. Thanks, Omkar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Handling-of-slow-elements-in-dstream-s-processing-tp26722.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org