So is there anyway of creating an rdd without using offsetRanges? Sorry for lack of clarity here
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sc, kafkaParams, offsetRanges) Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 23 April 2016 at 00:13, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > So there is really no point in using it :( > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 23 April 2016 at 00:11, Ted Yu <yuzhih...@gmail.com> wrote: > >> The class is private : >> >> final class OffsetRange private( >> >> On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> Ok I decided to forgo that approach and use an existing program of mine >>> with slight modification. The code is this >>> >>> import org.apache.spark.SparkContext >>> import org.apache.spark.SparkConf >>> import org.apache.spark.sql.Row >>> import org.apache.spark.sql.hive.HiveContext >>> import org.apache.spark.sql.types._ >>> import org.apache.spark.sql.SQLContext >>> import org.apache.spark.sql.functions._ >>> import _root_.kafka.serializer.StringDecoder >>> import org.apache.spark.streaming._ >>> import org.apache.spark.streaming.kafka.KafkaUtils >>> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange} >>> // >>> object CEP_assembly { >>> def main(args: Array[String]) { >>> val conf = new SparkConf(). >>> setAppName("CEP_assembly"). >>> setMaster("local[2]"). >>> set("spark.driver.allowMultipleContexts", "true"). >>> set("spark.hadoop.validateOutputSpecs", "false") >>> val sc = new SparkContext(conf) >>> // Create sqlContext based on HiveContext >>> val sqlContext = new HiveContext(sc) >>> import sqlContext.implicits._ >>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> println ("\nStarted at"); sqlContext.sql("SELECT >>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>> ").collect.foreach(println) >>> val ssc = new StreamingContext(conf, Seconds(1)) >>> ssc.checkpoint("checkpoint") >>> val kafkaParams = Map[String, String]("bootstrap.servers" -> >>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", >>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" ) >>> val topics = Set("newtopic", "newtopic") >>> val dstream = KafkaUtils.createDirectStream[String, String, >>> StringDecoder, StringDecoder](ssc, kafkaParams, topics) >>> dstream.cache() >>> val lines = dstream.map(_._2) >>> val showResults = lines.filter(_.contains("statement >>> cache")).flatMap(line => line.split("\n,")).map(word => (word, >>> 1)).reduceByKey(_ + _) >>> // Define the offset ranges to read in the batch job >>> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220) >>> // Create the RDD based on the offset ranges >>> val rdd = KafkaUtils.createRDD[String, String, StringDecoder, >>> StringDecoder](sc, kafkaParams, offsetRanges) >>> ssc.start() >>> ssc.awaitTermination() >>> //ssc.stop() >>> println ("\nFinished at"); sqlContext.sql("SELECT >>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >>> ").collect.foreach(println) >>> } >>> } >>> >>> >>> With sbt >>> >>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" % >>> "provided" >>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1" % >>> "provided" >>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" % >>> "provided" >>> libraryDependencies += "junit" % "junit" % "4.12" >>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0" >>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" >>> % "provided" >>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % >>> "1.6.1" >>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6" >>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" >>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" >>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" >>> % "test" >>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % >>> "1.6.1" >>> >>> >>> However, I an getting the following error >>> >>> [info] Loading project definition from >>> /data6/hduser/scala/CEP_assembly/project >>> [info] Set current project to CEP_assembly (in build >>> file:/data6/hduser/scala/CEP_assembly/) >>> [info] Compiling 1 Scala source to >>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes... >>> [error] >>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37: >>> constructor OffsetRange in class OffsetRange cannot be accessed in object >>> CEP_assembly >>> [error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220) >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 22 April 2016 at 18:41, Marcelo Vanzin <van...@cloudera.com> wrote: >>> >>>> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh >>>> <mich.talebza...@gmail.com> wrote: >>>> > I am trying to test Spark with CEP and I have been shown a sample here >>>> > >>>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532 >>>> >>>> I'm not familiar with CEP, but that's a Spark unit test, so if you're >>>> trying to run it outside of the context of Spark unit tests (as it >>>> seems you're trying to do), you're going to run into a world of >>>> trouble. I'd suggest a different approach where whatever you're trying >>>> to do is done through the Spark build, not outside of it. >>>> >>>> -- >>>> Marcelo >>>> >>> >>> >> >