Hi Marcelo

Thanks for your input.

I am trying to test Spark with CEP and I have been shown a sample here


https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

It is a long code

package org.apache.spark.streaming.kafka

import java.io.File
import java.util.Arrays
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils

class DirectKafkaStreamSuite
  extends SparkFunSuite
  with BeforeAndAfter
  with BeforeAndAfterAll
  with Eventually
  with Logging {
  val sparkConf = new SparkConf()
    .setMaster("local[4]")
    .setAppName(this.getClass.getSimpleName)

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _
  private var testDir: File = _

  private var kafkaTestUtils: KafkaTestUtils = _

  override def beforeAll {
    kafkaTestUtils = new KafkaTestUtils
    kafkaTestUtils.setup()
  }

  override def afterAll {
    if (kafkaTestUtils != null) {
      kafkaTestUtils.teardown()
      kafkaTestUtils = null
    }
  }

  after {
    if (ssc != null) {
      ssc.stop()
      sc = null
    }
    if (sc != null) {
      sc.stop()
    }
    if (testDir != null) {
      Utils.deleteRecursively(testDir)
    }
  }


  test("basic stream receiving with multiple topics and smallest
starting offset") {
    val topics = Set("basic1", "basic2", "basic3")
    val data = Map("a" -> 7, "b" -> 9)
    topics.foreach { t =>
      kafkaTestUtils.createTopic(t)
      kafkaTestUtils.sendMessages(t, data)
    }
    val totalSent = data.values.sum * topics.size
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"
    )

    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
        ssc, kafkaParams, topics)
    }

    val allReceived = new ConcurrentLinkedQueue[(String, String)]()

    // hold a reference to the current offset ranges, so it can be
used downstream
    var offsetRanges = Array[OffsetRange]()

    stream.transform { rdd =>
      // Get the offset ranges in the RDD
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>
      for (o <- offsetRanges) {
        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
      // For each partition, get size of the range in the partition,
      // and the number of items in the partition
        val off = offsetRanges(i)
        val all = iter.toSeq
        val partSize = all.size
        val rangeSize = off.untilOffset - off.fromOffset
        Iterator((partSize, rangeSize))
      }.collect

      // Verify whether number of elements in each partition
      // matches with the corresponding offset range
      collected.foreach { case (partSize, rangeSize) =>
        assert(partSize === rangeSize, "offset ranges are wrong")
      }
    }
    stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
    ssc.start()
    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
      assert(allReceived.size === totalSent,
        "didn't get expected number of messages, messages:\n" +
          allReceived.asScala.mkString("\n"))
    }
    ssc.stop()
  }

  test("receiving from largest starting offset") {
    val topic = "largest"
    val topicPartition = TopicAndPartition(topic, 0)
    val data = Map("a" -> 10)
    kafkaTestUtils.createTopic(topic)
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "largest"
    )
    val kc = new KafkaCluster(kafkaParams)
    def getLatestOffset(): Long = {
      
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
    }

    // Send some initial messages before starting context
    kafkaTestUtils.sendMessages(topic, data)
    eventually(timeout(10 seconds), interval(20 milliseconds)) {
      assert(getLatestOffset() > 3)
    }
    val offsetBeforeStart = getLatestOffset()

    // Setup context and kafka stream with largest offset
    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
        ssc, kafkaParams, Set(topic))
    }
    assert(
      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
        .fromOffsets(topicPartition) >= offsetBeforeStart,
      "Start offset not from latest"
    )

    val collectedData = new ConcurrentLinkedQueue[String]()
    stream.map { _._2 }.foreachRDD { rdd =>
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
    ssc.start()
    val newData = Map("b" -> 10)
    kafkaTestUtils.sendMessages(topic, newData)
    eventually(timeout(10 seconds), interval(50 milliseconds)) {
      collectedData.contains("b")
    }
    assert(!collectedData.contains("a"))
  }


  test("creating stream by offset") {
    val topic = "offset"
    val topicPartition = TopicAndPartition(topic, 0)
    val data = Map("a" -> 10)
    kafkaTestUtils.createTopic(topic)
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "largest"
    )
    val kc = new KafkaCluster(kafkaParams)
    def getLatestOffset(): Long = {
      
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
    }

    // Send some initial messages before starting context
    kafkaTestUtils.sendMessages(topic, data)
    eventually(timeout(10 seconds), interval(20 milliseconds)) {
      assert(getLatestOffset() >= 10)
    }
    val offsetBeforeStart = getLatestOffset()

    // Setup context and kafka stream with largest offset
    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, String](
        ssc, kafkaParams, Map(topicPartition -> 11L),
        (m: MessageAndMetadata[String, String]) => m.message())
    }
    assert(
      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
        .fromOffsets(topicPartition) >= offsetBeforeStart,
      "Start offset not from latest"
    )

    val collectedData = new ConcurrentLinkedQueue[String]()
    stream.foreachRDD { rdd =>
collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
    ssc.start()
    val newData = Map("b" -> 10)
    kafkaTestUtils.sendMessages(topic, newData)
    eventually(timeout(10 seconds), interval(50 milliseconds)) {
      collectedData.contains("b")
    }
    assert(!collectedData.contains("a"))
  }

  // Test to verify the offset ranges can be recovered from the checkpoints
  test("offset recovery") {
    val topic = "recovery"
    kafkaTestUtils.createTopic(topic)
    testDir = Utils.createTempDir()

    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"
    )

    // Send data to Kafka and wait for it to be received
    def sendDataAndWaitForReceive(data: Seq[Int]) {
      val strings = data.map { _.toString}
      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
      eventually(timeout(10 seconds), interval(50 milliseconds)) {
        assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
      }
    }

    // Setup the streaming context
    ssc = new StreamingContext(sparkConf, Milliseconds(100))
    val kafkaStream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
        ssc, kafkaParams, Set(topic))
    }
    val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
    val stateStream = keyedStream.updateStateByKey { (values:
Seq[Int], state: Option[Int]) =>
      Some(values.sum + state.getOrElse(0))
    }
    ssc.checkpoint(testDir.getAbsolutePath)

    // This is to collect the raw data received from Kafka
    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
      val data = rdd.map { _._2 }.collect()
      DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
    }

    // This is ensure all the data is eventually receiving only once
    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
      rdd.collect().headOption.foreach { x =>
DirectKafkaStreamSuite.total = x._2 }
    }
    ssc.start()

    // Send some data and wait for them to be received
    for (i <- (1 to 10).grouped(4)) {
      sendDataAndWaitForReceive(i)
    }

    // Verify that offset ranges were generated
    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
    assert(
      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
      "starting offset not zero"
    )
    ssc.stop()
    logInfo("====== RESTARTING ========")

    // Recover context from checkpoints
    ssc = new StreamingContext(testDir.getAbsolutePath)
    val recoveredStream =
ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String,
String)]]

    // Verify offset ranges have been recovered
    val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
    val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x =>
(x._1, x._2.toSet) }
    assert(
      recoveredOffsetRanges.forall { or =>
        earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
      },
      "Recovered ranges are not the same as the ones generated"
    )
    // Restart context, give more data and verify the total at the end
    // If the total is write that means each records has been received only once
    ssc.start()
    sendDataAndWaitForReceive(11 to 20)
    eventually(timeout(10 seconds), interval(50 milliseconds)) {
      assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
    }
    ssc.stop()
  }

  test("Direct Kafka stream report input information") {
    val topic = "report-test"
    val data = Map("a" -> 7, "b" -> 9)
    kafkaTestUtils.createTopic(topic)
    kafkaTestUtils.sendMessages(topic, data)

    val totalSent = data.values.sum
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"
    )

    import DirectKafkaStreamSuite._
    ssc = new StreamingContext(sparkConf, Milliseconds(200))
    val collector = new InputInfoCollector
    ssc.addStreamingListener(collector)

    val stream = withClue("Error creating direct stream") {
      KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
        ssc, kafkaParams, Set(topic))
    }

    val allReceived = new ConcurrentLinkedQueue[(String, String)]

    stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
    ssc.start()
    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
      assert(allReceived.size === totalSent,
        "didn't get expected number of messages, messages:\n" +
          allReceived.asScala.mkString("\n"))

      // Calculate all the record number collected in the StreamingListener.
      assert(collector.numRecordsSubmitted.get() === totalSent)
      assert(collector.numRecordsStarted.get() === totalSent)
      assert(collector.numRecordsCompleted.get() === totalSent)
    }
    ssc.stop()
  }

  test("maxMessagesPerPartition with backpressure disabled") {
    val topic = "maxMessagesPerPartition"
    val kafkaStream = getDirectKafkaStream(topic, None)

    val input = Map(TopicAndPartition(topic, 0) -> 50L,
TopicAndPartition(topic, 1) -> 50L)
    assert(kafkaStream.maxMessagesPerPartition(input).get ==
      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic,
1) -> 10L))
  }

  test("maxMessagesPerPartition with no lag") {
    val topic = "maxMessagesPerPartition"
    val rateController = Some(new ConstantRateController(0, new
ConstantEstimator(100), 100))
    val kafkaStream = getDirectKafkaStream(topic, rateController)

    val input = Map(TopicAndPartition(topic, 0) -> 0L,
TopicAndPartition(topic, 1) -> 0L)
    assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
  }

  test("maxMessagesPerPartition respects max rate") {
    val topic = "maxMessagesPerPartition"
    val rateController = Some(new ConstantRateController(0, new
ConstantEstimator(100), 1000))
    val kafkaStream = getDirectKafkaStream(topic, rateController)

    val input = Map(TopicAndPartition(topic, 0) -> 1000L,
TopicAndPartition(topic, 1) -> 1000L)
    assert(kafkaStream.maxMessagesPerPartition(input).get ==
      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic,
1) -> 10L))
  }

  test("using rate controller") {
    val topic = "backpressure"
    val topicPartitions = Set(TopicAndPartition(topic, 0),
TopicAndPartition(topic, 1))
    kafkaTestUtils.createTopic(topic, 2)
    val kafkaParams = Map(
      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
      "auto.offset.reset" -> "smallest"
    )

    val batchIntervalMilliseconds = 100
    val estimator = new ConstantEstimator(100)
    val messages = Map("foo" -> 200)
    kafkaTestUtils.sendMessages(topic, messages)

    val sparkConf = new SparkConf()
      // Safe, even with streaming, because we're using the direct API.
      // Using 1 core is useful to make the test more predictable.
      .setMaster("local[1]")
      .setAppName(this.getClass.getSimpleName)
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // Setup the streaming context
    ssc = new StreamingContext(sparkConf,
Milliseconds(batchIntervalMilliseconds))

    val kafkaStream = withClue("Error creating direct stream") {
      val kc = new KafkaCluster(kafkaParams)
      val messageHandler = (mmd: MessageAndMetadata[String, String])
=> (mmd.key, mmd.message)
      val m = kc.getEarliestLeaderOffsets(topicPartitions)
        .fold(e => Map.empty[TopicAndPartition, Long], m =>
m.mapValues(lo => lo.offset))

      new DirectKafkaInputDStream[String, String, StringDecoder,
StringDecoder, (String, String)](
          ssc, kafkaParams, m, messageHandler) {
        override protected[streaming] val rateController =
          Some(new DirectKafkaRateController(id, estimator))
      }
    }

    val collectedData = new ConcurrentLinkedQueue[Array[String]]()

    // Used for assertion failure messages.
    def dataToString: String =
      collectedData.asScala.map(_.mkString("[", ",",
"]")).mkString("{", ", ", "}")

    // This is to collect the raw data received from Kafka
    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
      val data = rdd.map { _._2 }.collect()
      collectedData.add(data)
    }

    ssc.start()

    // Try different rate limits.
    // Wait for arrays of data to appear matching the rate.
    Seq(100, 50, 20).foreach { rate =>
      collectedData.clear()       // Empty this buffer on each pass.
      estimator.updateRate(rate)  // Set a new rate.
      // Expect blocks of data equal to "rate", scaled by the interval
length in secs.
      val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
      eventually(timeout(5.seconds),
interval(batchIntervalMilliseconds.milliseconds)) {
        // Assert that rate estimator values are used to determine
maxMessagesPerPartition.
        // Funky "-" in message makes the complete assertion message
read better.
        assert(collectedData.asScala.exists(_.size == expectedSize),
          s" - No arrays of size $expectedSize for rate $rate found in
$dataToString")
      }
    }

    ssc.stop()
  }

  /** Get the generated offset ranges from the DirectKafkaStream */
  private def getOffsetRanges[K, V](
      kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
    kafkaStream.generatedRDDs.mapValues { rdd =>
      rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
    }.toSeq.sortBy { _._1 }
  }

  private def getDirectKafkaStream(topic: String, mockRateController:
Option[RateController]) = {
    val batchIntervalMilliseconds = 100

    val sparkConf = new SparkConf()
      .setMaster("local[1]")
      .setAppName(this.getClass.getSimpleName)
      .set("spark.streaming.kafka.maxRatePerPartition", "100")

    // Setup the streaming context
    ssc = new StreamingContext(sparkConf,
Milliseconds(batchIntervalMilliseconds))

    val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L,
TopicAndPartition(topic, 1) -> 0L)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
(mmd.key, mmd.message)
    new DirectKafkaInputDStream[String, String, StringDecoder,
StringDecoder, (String, String)](
      ssc, Map[String, String](), earliestOffsets, messageHandler) {
      override protected[streaming] val rateController = mockRateController
    }
  }
}

object DirectKafkaStreamSuite {
  val collectedData = new ConcurrentLinkedQueue[String]()
  @volatile var total = -1L

  class InputInfoCollector extends StreamingListener {
    val numRecordsSubmitted = new AtomicLong(0L)
    val numRecordsStarted = new AtomicLong(0L)
    val numRecordsCompleted = new AtomicLong(0L)

    override def onBatchSubmitted(batchSubmitted:
StreamingListenerBatchSubmitted): Unit = {
      numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
    }

    override def onBatchStarted(batchStarted:
StreamingListenerBatchStarted): Unit = {
      numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
    }

    override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted): Unit = {
      numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
    }
  }
}

private[streaming] class ConstantEstimator(@volatile private var rate: Long)
  extends RateEstimator {

  def updateRate(newRate: Long): Unit = {
    rate = newRate
  }

  def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double] = Some(rate)
}

private[streaming] class ConstantRateController(id: Int, estimator:
RateEstimator, rate: Long)
  extends RateController(id, estimator) {
  override def publish(rate: Long): Unit = ()
  override def getLatestRate(): Long = rate
}

/**
  * Temporarily added some samples for the CEP API's here.
  * TODO: add test cases in streaming/src/test/scala/CepSuite.scala
  */

import org.apache.spark.streaming.dstream.WindowState

object PatternMatchByKey {

  import org.apache.spark.streaming._
  import org.apache.spark.Partitioner

  def main(args: Array[String]) {
    val brokers = args(0)
    val topics = args(1)
    val checkpointDir = args(2)


    // Create context with 2 second batch interval
    val sparkConf = new
SparkConf().setMaster("local[*]").setAppName("PatternMatchByKey")
    val ssc = new StreamingContext(sparkConf, Seconds(6))
    ssc.checkpoint(checkpointDir)

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset" -> "smallest")

    case class Tick(symbol: String, price: Int, ts: Long)

    // Read a stream of message from kafka
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get each message and put into a Tick object
    val ticks = messages.map(_._2)
      .map(_.split("[,:]")).map(p => {
      Tick(p(1), p(3).trim.toInt, p(5).trim.toLong)
    })

    // put into a KV by compName and Timetamp
    val kvTicks = ticks.map(t => (t.symbol, t))
    kvTicks.cache()


    // define rise, drop & deep predicates
    def rise(in: Tick, ew: WindowState): Boolean = {
      in.price > ew.first.asInstanceOf[Tick].price  &&
        in.price >= ew.prev.asInstanceOf[Tick].price
    }
    def drop(in: Tick, ew: WindowState): Boolean = {
      in.price >= ew.first.asInstanceOf[Tick].price  &&
        in.price < ew.prev.asInstanceOf[Tick].price
    }
    def deep(in: Tick, ew: WindowState): Boolean = {
      in.price < ew.first.asInstanceOf[Tick].price &&
        in.price < ew.prev.asInstanceOf[Tick].price
    }

    // map the predicates to their state names
    val predicateMapping: Map[String, (Tick, WindowState) => Boolean] =
      Map("rise" -> rise, "drop" -> drop, "deep" -> deep)

    val matches = kvTicks.patternMatchByKeyAndWindow("rise drop [rise
]+ deep".r,
      predicateMapping, (in: Tick) => in.ts, Seconds(12), Seconds(6))

    matches.print()

    ssc.start()
    ssc.awaitTermination()


    // Start the computation

  }
}

object PatternMatch {

  import org.apache.spark.streaming._
  import org.apache.spark.Partitioner

  def main(args: Array[String]) {
    val brokers = args(0)
    val topics = args(1)
    val checkpointDir = args(2)


    // Create context with 2 second batch interval
    val sparkConf = new
SparkConf().setMaster("local[*]").setAppName("PatternMatch")
    val ssc = new StreamingContext(sparkConf, Seconds(6))
    ssc.checkpoint(checkpointDir)

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
      "auto.offset.reset" -> "smallest")

    case class Tick(symbol: String, price: Int, ts: Long)

    // Read a stream of message from kafka
    val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get each message and put into a Tick object
    val ticks = messages.map(_._2)
      .map(_.split("[,:]")).map(p => {
      Tick(p(1), p(3).trim.toInt, p(5).trim.toLong)
    })

    // put into a KV by compName and Timetamp
    val kvTicks = ticks.map(t => (t.ts, t))

    // define rise, drop & deep predicates
    def rise(in: Tick, ew: WindowState): Boolean = {
      in.price > ew.first.asInstanceOf[Tick].price  &&
        in.price >= ew.prev.asInstanceOf[Tick].price
    }
    def drop(in: Tick, ew: WindowState): Boolean = {
      in.price >= ew.first.asInstanceOf[Tick].price  &&
        in.price < ew.prev.asInstanceOf[Tick].price
    }
    def deep(in: Tick, ew: WindowState): Boolean = {
      in.price < ew.first.asInstanceOf[Tick].price &&
        in.price < ew.prev.asInstanceOf[Tick].price
    }

    // map the predicates to their state names
    val predicateMapping: Map[String, (Tick, WindowState) => Boolean] =
      Map("rise" -> rise, "drop" -> drop, "deep" -> deep)

    val matches = kvTicks.patternMatchByWindow("rise drop [rise ]+ deep".r,
      predicateMapping, Seconds(12), Seconds(6))

    matches.print()

    ssc.start()
    ssc.awaitTermination()

    // Start the computation

  }}


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 18:31, Marcelo Vanzin <van...@cloudera.com> wrote:

> Sorry, I've been looking at this thread and the related ones and one
> thing I still don't understand is: why are you trying to use internal
> Spark classes like Logging and SparkFunSuite in your code?
>
> Unless you're writing code that lives inside Spark, you really
> shouldn't be trying to reference them. First reason being that they're
> "private[spark]" and even if they're available, the compiler won't let
> you.
>
> On Fri, Apr 22, 2016 at 12:21 AM, Mich Talebzadeh
> <mich.talebza...@gmail.com> wrote:
> >
> > Hi,
> >
> > Anyone know which jar file has  import org.apache.spark.internal.Logging?
> >
> > I tried spark-core_2.10-1.5.1.jar
> >
> > but does not seem to work
> >
> > scala> import org.apache.spark.internal.Logging
> >
> > <console>:57: error: object internal is not a member of package
> > org.apache.spark
> >          import org.apache.spark.internal.Logging
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>
>
>
> --
> Marcelo
>

Reply via email to