Problem statement:
I am building a somewhat time-critical application that is supposed to
receive messages on a stream(ZMQ) and then operate on each of the data
points that comes in on the stream. The caveat is that some data points may
need more time for processing than most others since the processing involves
another network i/o (a remote fetch operation like an HTTP GET from various
remote servers / remote sources . And with timeouts of course). In this
scenario, I want the processing of the remaining data points to continue
without blocking for the data from the slow-processing data point. Also, i
want the next batch of data points out of the stream to start being
processed while we are still waiting for the previous batch's
slow-processing data point.

Progress so far:
To test out the concept, I tried the following: Created a test stream that
generates a random number every 500 ms. During processing of the Dstream out
of this, select a string "3" and purposely delay the processing for it and
observe the behavior.

// Just a test Stream that generates random number Strings .
class testInputDStream (
                        @transient ssc_ : StreamingContext,
                        storageLevel: StorageLevel
                      ) extends ReceiverInputDStream[String](ssc_) with
Logging {
  def getReceiver(): Receiver[String] = {
    new testReceiver(storageLevel)
  }
}
object testUtils {
  def createStream(
                    ssc: StreamingContext,
                    storageLevel: StorageLevel =
StorageLevel.MEMORY_AND_DISK_SER_2
                  ): DStream[String] = {
    new testInputDStream(ssc, storageLevel)
  }
}
class testReceiver (storageLevel: StorageLevel) extends
Receiver[String](storageLevel) with Logging {
  def onStop() {
  }
  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Test Receiver") {
      override def run() { receive() }
    }.start()
  }
  /** Create a socket connection and receive data until receiver is stopped
*/
  private def receive(): Unit = {
    println("Starting test receiver...")
    val num = scala.util.Random
    try {
      while(true) {
        val newnum = num.nextInt(10)
        store(newnum.toString())
        Thread.sleep(100)
      }
    } catch {
      case e: Exception => println(e)
    }
  }
}

object test_streaming {
  private def gofetch(x :String): List[Any] = {
    if (x == "3") {
      println("Slow Processing")
      Thread.sleep(2000)
    }
    return List(x, x, x)
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    //  Prepare our context and stream
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test_streaming")

    // Initialization
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Milliseconds(500))

    val rstream = testUtils.createStream(ssc, StorageLevel.MEMORY_ONLY)

    val ourdata = rstream.map( x => gofetch(x))

    ourdata.count().print()

    ssc.start()
    ssc.awaitTermination() // Wait for the computation to terminate*/
  }
}

However, I see that the slow processing Data point blocks other data points.
Is there a way of achieving what I am trying to do?

I would appreciate feedbacks / inputs.

Thanks,
Omkar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Handling-of-slow-elements-in-dstream-s-processing-tp26722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to