Unit testing and Spark Streaming

2014-12-12 Thread Eric Loots
Hi,

I’ve started my first experiments with Spark Streaming and started with setting 
up an environment using ScalaTest to do unit testing. Poked around on this 
mailing list and googled the topic.

One of the things I wanted to be able to do is to use Scala Sequences as data 
source in the tests (instead of using files for example). For this, queueStream 
on a StreamingContext came in handy.

I now have a setup that allows me to run WordSpec style tests like in:

class StreamTests extends StreamingContextBaseSpec(Some-tests) with Matchers 
with WordsCountsTestData {

  Running word count should {
produce the correct word counts for a non-empty list of words in {

  val streamingData = injectData(data1)
  val wordCountsStream = WordCounter.wordCounter(streamingData)
  val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc)
  val sliceSet = wordCounts.toSet

  wordCounts.toSet shouldBe wordCounts1
}

return count = 1 for the empty string in {

  val streamingData: InputDStream[String] = injectData(data2)
  val wordCountsStream: DStream[(String, Int)] = 
WordCounter.wordCounter(streamingData)
  val wordCounts: Seq[(String, Int)] = 
startStreamAndExtractResult(wordCountsStream, ssc)

  wordCounts.toSet shouldBe wordCounts2
}
return an empty result for an empty list of words in {

  val streamingData = injectData(data3)
  val wordCountsStream = WordCounter.wordCounter(streamingData)
  val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc)

  wordCounts.toSet shouldBe wordCounts3
}

  }

  Running word count with filtering out words with single occurrence should {
produce the correct word counts for a non-empty list of words in {

  val streamingData = injectData(data1)
  val wordCountsStream = WordCounter.wordCountOverOne(streamingData)
  val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc)

  wordCounts.toSet shouldBe wordCounts1.filter(_._2  1)
}
  }
}

where WordsCountsTestData (added at the end of this message) is a trait that 
contains the test data and the correct results. 

The two methods under test in the above test code (WordCounter.wordCounter and 
WordCounter.wordCountOverOne) are:

object WordCounter {
  def wordCounter(input: InputDStream[String]): DStream[(String, Int)] = {
val pairs = input.map(word = (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts
  }

  def wordCountOverOne(input: InputDStream[String]): DStream[(String, Int)] = {
val pairs = input.map(word = (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts filter (_._2  1)
  }
}

StreamingContextBaseSpec contains the actual test helper methods such as 
injectData and startStreamAndExtractResult.

package spark.testing

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.scalatest.{BeforeAndAfter, WordSpec}

import scala.collection.mutable.Queue
import scala.reflect.ClassTag

class StreamingContextBaseSpec(name: String, silenceSpark : Boolean = true) 
extends WordSpec with BeforeAndAfter {

  val BatchDuration = 10  // milliseconds
  val DeltaTBefore  = 20 * BatchDuration
  val DeltaTAfter   = 10 * BatchDuration
  def injectData[T: ClassTag](data: Seq[T]): InputDStream[T] = {
val dataAsRDD = ssc.sparkContext.parallelize(data)
val dataAsRDDOnQueue = Queue(dataAsRDD)
ssc.queueStream(dataAsRDDOnQueue, oneAtATime = false)
  }

  def startStreamAndExtractResult[T: ClassTag](stream: DStream[T], ssc: 
StreamingContext): Seq[T] = {
stream.print()
println(s~~~ starting execution context $ssc)
val sTime = System.currentTimeMillis()
ssc.start()
val startWindow = new Time(sTime - DeltaTBefore)
val endWindow = new Time(sTime + DeltaTAfter)
val sliceRDDs = stream.slice(startWindow, endWindow)
sliceRDDs.map(rdd = rdd.collect()).flatMap(data = data.toVector)
  }

  var ssc: StreamingContext = _

  before {
System.clearProperty(spark.driver.port)
System.clearProperty(spark.driver.host)
if ( silenceSpark ) SparkUtil.silenceSpark()
val conf = new SparkConf().setMaster(local).setAppName(name)
ssc = new StreamingContext(conf, Milliseconds(BatchDuration))
  }

  after {
println(s~~~ stopping execution context $ssc)
System.clearProperty(spark.driver.port)
System.clearProperty(spark.driver.host)
ssc.stop(stopSparkContext = true, stopGracefully = true)
ssc.awaitTermination()
ssc = null
  }
}

So far for the prelude, now my questions:
Is this a good way to perform this kind of testing ?
Are there more efficient ways to run this kind of testing ?
To reduce the test run time, I’m running the stream with a batch interval of 
only 10ms and a window that extends to 100ms (This seems to work fine as far as 
I can see. When the batch interval is 

Re: Unit testing and Spark Streaming

2014-12-12 Thread Emre Sevinc
On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote:
 How can the log level in test mode be reduced (or extended when needed) ?

Hello Eric,

The following might be helpful for reducing the log messages during unit
testing:

 http://stackoverflow.com/a/2736/236007

--
Emre Sevinç
https://be.linkedin.com/in/emresevinc


Re: Unit testing and Spark Streaming

2014-12-12 Thread Jay Vyas
https://github.com/jayunit100/SparkStreamingCassandraDemo
 
On this note, I've built a framework which is mostly pure so that functional 
unit tests can be run composing mock data for Twitter statuses, with just 
regular junit... That might be relevant also.

I think at some point we should come up with a robust test driven  framework 
for building stream apps... And the idea of Scala test with the injection and 
comparison you did might be a good start.

Thanks for starting this dialogue!

 On Dec 12, 2014, at 9:18 AM, Emre Sevinc emre.sev...@gmail.com wrote:
 
 On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote:
  How can the log level in test mode be reduced (or extended when needed) ?
 
 Hello Eric,
 
 The following might be helpful for reducing the log messages during unit 
 testing:
 
  http://stackoverflow.com/a/2736/236007
 
 --
 Emre Sevinç
 https://be.linkedin.com/in/emresevinc