I am trying to test Spark with CEP and I have been shown a sample here


It is a long code

package org.apache.spark.streaming.kafka

import java.io.File
import java.util.Arrays
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils

class DirectKafkaStreamSuite
  extends SparkFunSuite
  with BeforeAndAfter
  with BeforeAndAfterAll
  with Eventually
  with Logging {
  val sparkConf = new SparkConf()

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _
  private var testDir: File = _

  private var kafkaTestUtils: KafkaTestUtils = _

  override def beforeAll {
    kafkaTestUtils = new KafkaTestUtils

  override def afterAll {
    if (kafkaTestUtils != null) {
      kafkaTestUtils = null

  after {
    if (ssc != null) {
      sc = null
    if (sc != null) {
    if (testDir != null) {

  test("basic stream receiving with multiple topics and smallest
starting offset") {
    val topics = Set("basic1", "basic2", "basic3")
    val data = Map("a" -> 7, "b" -> 9)
    topics.foreach { t =>
      kafkaTestUtils.sendMessages(t, data)
    val totalSent = data.values.sum * topics.size
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"

    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
        ssc, kafkaParams, topics)

    val allReceived = new ConcurrentLinkedQueue[(String, String)]()

    // hold a reference to the current offset ranges, so it can be
used downstream
    var offsetRanges = Array[OffsetRange]()

    stream.transform { rdd =>
      // Get the offset ranges in the RDD
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    }.foreachRDD { rdd =>
      for (o <- offsetRanges) {
        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
      // For each partition, get size of the range in the partition,
      // and the number of items in the partition
        val off = offsetRanges(i)
        val all = iter.toSeq
        val partSize = all.size
        val rangeSize = off.untilOffset - off.fromOffset
        Iterator((partSize, rangeSize))

      // Verify whether number of elements in each partition
      // matches with the corresponding offset range
      collected.foreach { case (partSize, rangeSize) =>
        assert(partSize === rangeSize, "offset ranges are wrong")
    stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
      assert(allReceived.size === totalSent,
        "didn't get expected number of messages, messages:\n" +

  test("receiving from largest starting offset") {
    val topic = "largest"
    val topicPartition = TopicAndPartition(topic, 0)
    val data = Map("a" -> 10)
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "largest"
    val kc = new KafkaCluster(kafkaParams)
    def getLatestOffset(): Long = {

    // Send some initial messages before starting context
    kafkaTestUtils.sendMessages(topic, data)
    eventually(timeout(10 seconds), interval(20 milliseconds)) {
      assert(getLatestOffset() > 3)
    val offsetBeforeStart = getLatestOffset()

    // Setup context and kafka stream with largest offset
    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
        ssc, kafkaParams, Set(topic))
      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
        .fromOffsets(topicPartition) >= offsetBeforeStart,
      "Start offset not from latest"

    val collectedData = new ConcurrentLinkedQueue[String]()
    stream.map { _._2 }.foreachRDD { rdd =>
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
    val newData = Map("b" -> 10)
    kafkaTestUtils.sendMessages(topic, newData)
    eventually(timeout(10 seconds), interval(50 milliseconds)) {

  test("creating stream by offset") {
    val topic = "offset"
    val topicPartition = TopicAndPartition(topic, 0)
    val data = Map("a" -> 10)
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "largest"
    val kc = new KafkaCluster(kafkaParams)
    def getLatestOffset(): Long = {

    // Send some initial messages before starting context
    kafkaTestUtils.sendMessages(topic, data)
    eventually(timeout(10 seconds), interval(20 milliseconds)) {
      assert(getLatestOffset() >= 10)
    val offsetBeforeStart = getLatestOffset()

    // Setup context and kafka stream with largest offset
    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, String](
        ssc, kafkaParams, Map(topicPartition -> 11L),
        (m: MessageAndMetadata[String, String]) => m.message())
      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
        .fromOffsets(topicPartition) >= offsetBeforeStart,
      "Start offset not from latest"

    val collectedData = new ConcurrentLinkedQueue[String]()
    stream.foreachRDD { rdd =>
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
    val newData = Map("b" -> 10)
    kafkaTestUtils.sendMessages(topic, newData)
    eventually(timeout(10 seconds), interval(50 milliseconds)) {

  // Test to verify the offset ranges can be recovered from the checkpoints
  test("offset recovery") {
    val topic = "recovery"
    testDir = Utils.createTempDir()

    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"

    // Send data to Kafka and wait for it to be received
    def sendDataAndWaitForReceive(data: Seq[Int]) {
      val strings = data.map { _.toString}
      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
      eventually(timeout(10 seconds), interval(50 milliseconds)) {
        assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })

    // Setup the streaming context
    ssc = new StreamingContext(sparkConf, Milliseconds(100))
    val kafkaStream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
        ssc, kafkaParams, Set(topic))
    val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
    val stateStream = keyedStream.updateStateByKey { (values:
Seq[Int], state: Option[Int]) =>
      Some(values.sum + state.getOrElse(0))

    // This is to collect the raw data received from Kafka
    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
      val data = rdd.map { _._2 }.collect()
      DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))

    // This is ensure all the data is eventually receiving only once
    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
      rdd.collect().headOption.foreach { x =>
DirectKafkaStreamSuite.total = x._2 }

    // Send some data and wait for them to be received
    for (i <- (1 to 10).grouped(4)) {

    // Verify that offset ranges were generated
    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
      "starting offset not zero"
    logInfo("====== RESTARTING ========")

    // Recover context from checkpoints
    ssc = new StreamingContext(testDir.getAbsolutePath)
    val recoveredStream =

    // Verify offset ranges have been recovered
    val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
    val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x =>
(x._1, x._2.toSet) }
      recoveredOffsetRanges.forall { or =>
        earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
      "Recovered ranges are not the same as the ones generated"
    // Restart context, give more data and verify the total at the end
    // If the total is write that means each records has been received only once
    sendDataAndWaitForReceive(11 to 20)
    eventually(timeout(10 seconds), interval(50 milliseconds)) {
      assert(DirectKafkaStreamSuite.total === (1 to 20).sum)

  test("Direct Kafka stream report input information") {
    val topic = "report-test"
    val data = Map("a" -> 7, "b" -> 9)
    kafkaTestUtils.sendMessages(topic, data)

    val totalSent = data.values.sum
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"

    import DirectKafkaStreamSuite._
    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val collector = new InputInfoCollector

    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
        ssc, kafkaParams, Set(topic))

    val allReceived = new ConcurrentLinkedQueue[(String, String)]

    stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
      assert(allReceived.size === totalSent,
        "didn't get expected number of messages, messages:\n" +

      // Calculate all the record number collected in the StreamingListener.
      assert(collector.numRecordsSubmitted.get() === totalSent)
      assert(collector.numRecordsStarted.get() === totalSent)
      assert(collector.numRecordsCompleted.get() === totalSent)

  test("maxMessagesPerPartition with backpressure disabled") {
    val topic = "maxMessagesPerPartition"
    val kafkaStream = getDirectKafkaStream(topic, None)

    val input = Map(TopicAndPartition(topic, 0) -> 50L,
TopicAndPartition(topic, 1) -> 50L)
    assert(kafkaStream.maxMessagesPerPartition(input).get ==
      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic,
1) -> 10L))

  test("maxMessagesPerPartition with no lag") {
    val topic = "maxMessagesPerPartition"
    val rateController = Some(new ConstantRateController(0, new
ConstantEstimator(100), 100))
    val kafkaStream = getDirectKafkaStream(topic, rateController)

    val input = Map(TopicAndPartition(topic, 0) -> 0L,
TopicAndPartition(topic, 1) -> 0L)

  test("maxMessagesPerPartition respects max rate") {
    val topic = "maxMessagesPerPartition"
    val rateController = Some(new ConstantRateController(0, new
ConstantEstimator(100), 1000))
    val kafkaStream = getDirectKafkaStream(topic, rateController)

    val input = Map(TopicAndPartition(topic, 0) -> 1000L,
TopicAndPartition(topic, 1) -> 1000L)
    assert(kafkaStream.maxMessagesPerPartition(input).get ==
      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic,
1) -> 10L))

  test("using rate controller") {
    val topic = "backpressure"
    val topicPartitions = Set(TopicAndPartition(topic, 0),
TopicAndPartition(topic, 1))
    kafkaTestUtils.createTopic(topic, 2)
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"

    val batchIntervalMilliseconds = 100
    val estimator = new ConstantEstimator(100)
    val messages = Map("foo" -> 200)
    kafkaTestUtils.sendMessages(topic, messages)

    val sparkConf = new SparkConf()
      // Safe, even with streaming, because we're using the direct API.
      // Using 1 core is useful to make the test more predictable.
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // Setup the streaming context
    ssc = new StreamingContext(sparkConf,

    val kafkaStream = withClue("Error creating direct stream") {
      val kc = new KafkaCluster(kafkaParams)
      val messageHandler = (mmd: MessageAndMetadata[String, String])
=> (mmd.key, mmd.message)
      val m = kc.getEarliestLeaderOffsets(topicPartitions)
        .fold(e => Map.empty[TopicAndPartition, Long], m =>
m.mapValues(lo => lo.offset))

      new DirectKafkaInputDStream[String, String, StringDecoder,
StringDecoder, (String, String)](
          ssc, kafkaParams, m, messageHandler) {
        override protected[streaming] val rateController =
          Some(new DirectKafkaRateController(id, estimator))

    val collectedData = new ConcurrentLinkedQueue[Array[String]]()

    // Used for assertion failure messages.
    def dataToString: String =
      collectedData.asScala.map(_.mkString("[", ",",
"]")).mkString("{", ", ", "}")

    // This is to collect the raw data received from Kafka
    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
      val data = rdd.map { _._2 }.collect()


    // Try different rate limits.
    // Wait for arrays of data to appear matching the rate.
    Seq(100, 50, 20).foreach { rate =>
      collectedData.clear()       // Empty this buffer on each pass.
      estimator.updateRate(rate)  // Set a new rate.
      // Expect blocks of data equal to "rate", scaled by the interval
length in secs.
      val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
interval(batchIntervalMilliseconds.milliseconds)) {
        // Assert that rate estimator values are used to determine
        // Funky "-" in message makes the complete assertion message
read better.
        assert(collectedData.asScala.exists(_.size == expectedSize),
          s" - No arrays of size $expectedSize for rate $rate found in


  /** Get the generated offset ranges from the DirectKafkaStream */
  private def getOffsetRanges[K, V](
      kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
    kafkaStream.generatedRDDs.mapValues { rdd =>
      rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
    }.toSeq.sortBy { _._1 }

  private def getDirectKafkaStream(topic: String, mockRateController:
Option[RateController]) = {
    val batchIntervalMilliseconds = 100

    val sparkConf = new SparkConf()
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // Setup the streaming context
    ssc = new StreamingContext(sparkConf,

    val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L,
TopicAndPartition(topic, 1) -> 0L)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
(mmd.key, mmd.message)
    new DirectKafkaInputDStream[String, String, StringDecoder,
StringDecoder, (String, String)](
      ssc, Map[String, String](), earliestOffsets, messageHandler) {
      override protected[streaming] val rateController = mockRateController

object DirectKafkaStreamSuite {
  val collectedData = new ConcurrentLinkedQueue[String]()
  @volatile var total = -1L

  class InputInfoCollector extends StreamingListener {
    val numRecordsSubmitted = new AtomicLong(0L)
    val numRecordsStarted = new AtomicLong(0L)
    val numRecordsCompleted = new AtomicLong(0L)

    override def onBatchSubmitted(batchSubmitted:
StreamingListenerBatchSubmitted): Unit = {

    override def onBatchStarted(batchStarted:
StreamingListenerBatchStarted): Unit = {

    override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted): Unit = {

private[streaming] class ConstantEstimator(@volatile private var rate: Long)
  extends RateEstimator {

  def updateRate(newRate: Long): Unit = {
    rate = newRate

  def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double] = Some(rate)

private[streaming] class ConstantRateController(id: Int, estimator:
RateEstimator, rate: Long)
  extends RateController(id, estimator) {
  override def publish(rate: Long): Unit = ()
  override def getLatestRate(): Long = rate

  * Temporarily added some samples for the CEP API's here.
  * TODO: add test cases in streaming/src/test/scala/CepSuite.scala

import org.apache.spark.streaming.dstream.WindowState

object PatternMatchByKey {

  import org.apache.spark.streaming._
  import org.apache.spark.Partitioner

  def main(args: Array[String]) {
    val brokers = args(0)
    val topics = args(1)
    val checkpointDir = args(2)

    // Create context with 2 second batch interval
    val sparkConf = new
    val ssc = new StreamingContext(sparkConf, Seconds(6))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset" -> "smallest")

    case class Tick(symbol: String, price: Int, ts: Long)

    // Read a stream of message from kafka
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get each message and put into a Tick object
    val ticks = messages.map(_._2)
      .map(_.split("[,:]")).map(p => {
      Tick(p(1), p(3).trim.toInt, p(5).trim.toLong)

    // put into a KV by compName and Timetamp
    val kvTicks = ticks.map(t => (t.symbol, t))

    // define rise, drop & deep predicates
    def rise(in: Tick, ew: WindowState): Boolean = {
      in.price > ew.first.asInstanceOf[Tick].price  &&
        in.price >= ew.prev.asInstanceOf[Tick].price
    def drop(in: Tick, ew: WindowState): Boolean = {
      in.price >= ew.first.asInstanceOf[Tick].price  &&
        in.price < ew.prev.asInstanceOf[Tick].price
    def deep(in: Tick, ew: WindowState): Boolean = {
      in.price < ew.first.asInstanceOf[Tick].price &&
        in.price < ew.prev.asInstanceOf[Tick].price

    // map the predicates to their state names
    val predicateMapping: Map[String, (Tick, WindowState) => Boolean] =
      Map("rise" -> rise, "drop" -> drop, "deep" -> deep)

    val matches = kvTicks.patternMatchByKeyAndWindow("rise drop [rise
]+ deep".r,
      predicateMapping, (in: Tick) => in.ts, Seconds(12), Seconds(6))



    // Start the computation


object PatternMatch {

  import org.apache.spark.streaming._
  import org.apache.spark.Partitioner

  def main(args: Array[String]) {
    val brokers = args(0)
    val topics = args(1)
    val checkpointDir = args(2)

    // Create context with 2 second batch interval
    val sparkConf = new
    val ssc = new StreamingContext(sparkConf, Seconds(6))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset" -> "smallest")

    case class Tick(symbol: String, price: Int, ts: Long)

    // Read a stream of message from kafka
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get each message and put into a Tick object
    val ticks = messages.map(_._2)
      .map(_.split("[,:]")).map(p => {
      Tick(p(1), p(3).trim.toInt, p(5).trim.toLong)

    // put into a KV by compName and Timetamp
    val kvTicks = ticks.map(t => (t.ts, t))

    // define rise, drop & deep predicates
    def rise(in: Tick, ew: WindowState): Boolean = {
      in.price > ew.first.asInstanceOf[Tick].price  &&
        in.price >= ew.prev.asInstanceOf[Tick].price
    def drop(in: Tick, ew: WindowState): Boolean = {
      in.price >= ew.first.asInstanceOf[Tick].price  &&
        in.price < ew.prev.asInstanceOf[Tick].price
    def deep(in: Tick, ew: WindowState): Boolean = {
      in.price < ew.first.asInstanceOf[Tick].price &&
        in.price < ew.prev.asInstanceOf[Tick].price

    // map the predicates to their state names
    val predicateMapping: Map[String, (Tick, WindowState) => Boolean] =
      Map("rise" -> rise, "drop" -> drop, "deep" -> deep)

    val matches = kvTicks.patternMatchByWindow("rise drop [rise ]+ deep".r,
      predicateMapping, Seconds(12), Seconds(6))



    // Start the computation


