So is there anyway of creating an rdd without using offsetRanges? Sorry for
lack of clarity here


val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
StringDecoder](sc, kafkaParams, offsetRanges)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 23 April 2016 at 00:13, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> So there is really no point in using it :(
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 23 April 2016 at 00:11, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> The class is private :
>>
>> final class OffsetRange private(
>>
>> On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Ok I decided to forgo that approach and use an existing program of mine
>>> with slight modification. The code is this
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.functions._
>>> import _root_.kafka.serializer.StringDecoder
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
>>> //
>>> object CEP_assembly {
>>>   def main(args: Array[String]) {
>>>   val conf = new SparkConf().
>>>                setAppName("CEP_assembly").
>>>                setMaster("local[2]").
>>>                set("spark.driver.allowMultipleContexts", "true").
>>>                set("spark.hadoop.validateOutputSpecs", "false")
>>>   val sc = new SparkContext(conf)
>>>   // Create sqlContext based on HiveContext
>>>   val sqlContext = new HiveContext(sc)
>>>   import sqlContext.implicits._
>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>   println ("\nStarted at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> val ssc = new StreamingContext(conf, Seconds(1))
>>> ssc.checkpoint("checkpoint")
>>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>> val topics = Set("newtopic", "newtopic")
>>> val dstream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>>> dstream.cache()
>>> val lines = dstream.map(_._2)
>>> val showResults = lines.filter(_.contains("statement
>>> cache")).flatMap(line => line.split("\n,")).map(word => (word,
>>> 1)).reduceByKey(_ + _)
>>> // Define the offset ranges to read in the batch job
>>> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>>> // Create the RDD based on the offset ranges
>>> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
>>> StringDecoder](sc, kafkaParams, offsetRanges)
>>> ssc.start()
>>> ssc.awaitTermination()
>>> //ssc.stop()
>>>   println ("\nFinished at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>>   }
>>> }
>>>
>>>
>>> With sbt
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>>> "provided"
>>> libraryDependencies += "junit" % "junit" % "4.12"
>>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>>> "1.6.1"
>>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>>> % "test"
>>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
>>> "1.6.1"
>>>
>>>
>>> However, I an getting the following error
>>>
>>> [info] Loading project definition from
>>> /data6/hduser/scala/CEP_assembly/project
>>> [info] Set current project to CEP_assembly (in build
>>> file:/data6/hduser/scala/CEP_assembly/)
>>> [info] Compiling 1 Scala source to
>>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>>> [error]
>>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
>>> constructor OffsetRange in class OffsetRange cannot be accessed in object
>>> CEP_assembly
>>> [error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 22 April 2016 at 18:41, Marcelo Vanzin <van...@cloudera.com> wrote:
>>>
>>>> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh
>>>> <mich.talebza...@gmail.com> wrote:
>>>> > I am trying to test Spark with CEP and I have been shown a sample here
>>>> >
>>>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>>>
>>>> I'm not familiar with CEP, but that's a Spark unit test, so if you're
>>>> trying to run it outside of the context of Spark unit tests (as it
>>>> seems you're trying to do), you're going to run into a world of
>>>> trouble. I'd suggest a different approach where whatever you're trying
>>>> to do is done through the Spark build, not outside of it.
>>>>
>>>> --
>>>> Marcelo
>>>>
>>>
>>>
>>
>

Reply via email to