My bad! Confused myself with different build.sbt I tried in different
projects
Thx Cody for pointing that out(again)
Spark streaming Kafka was all I needed
Kr

On 6 Feb 2017 9:02 pm, "Cody Koeninger" <c...@koeninger.org> wrote:

> You should not need to include jars for Kafka, the spark connectors
> have the appropriate transitive dependency on the correct version.
>
> On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
> > Hi
> >  not sure if this will help at all, and pls take it with a pinch of salt
> as
> > i dont have your setup and i am not running on a cluster
> >
> >  I have tried to run a kafka example which was originally workkign on
> spark
> > 1.6.1 on spark 2.
> > These are the jars i am using
> >
> > spark-streaming-kafka-0-10_2.11_2.0.1.jar
> >
> > kafka_2.11-0.10.1.1
> >
> >
> > And here's the code up to the creation of the Direct Stream. apparently
> with
> > the new version of kafka libs some properties have to be specified
> >
> >
> > import org.apache.spark.SparkConf
> > import org.apache.spark.streaming.{Seconds, StreamingContext}
> > import org.apache.spark.storage.StorageLevel
> >
> > import java.util.regex.Pattern
> > import java.util.regex.Matcher
> >
> > import Utilities._
> >
> > import org.apache.spark.streaming.kafka010.KafkaUtils
> > import kafka.serializer.StringDecoder
> > import
> > org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> > import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
> >
> > /** Working example of listening for log data from Kafka's testLogs
> topic on
> > port 9092. */
> > object KafkaExample {
> >
> >   def main(args: Array[String]) {
> >
> >     // Create the context with a 1 second batch size
> >     val ssc = new StreamingContext("local[*]", "KafkaExample",
> Seconds(1))
> >
> >     setupLogging()
> >
> >     // Construct a regular expression (regex) to extract fields from raw
> > Apache log lines
> >     val pattern = apacheLogPattern()
> >
> >     val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
> > "bootstrap.servers" -> "localhost:9092",
> >             "key.deserializer"
> > ->"org.apache.kafka.common.serialization.StringDeserializer",
> >             "value.deserializer"
> > ->"org.apache.kafka.common.serialization.StringDeserializer",
> >             "group.id" -> "group1")
> >     val topics = List("testLogs").toSet
> >     val lines = KafkaUtils.createDirectStream[String, String](
> >                                             ssc,
> >                                             PreferConsistent,
> >                                             Subscribe[String,
> > String](topics, kafkaParams)
> >                                           ).map(cr => cr.value())
> >
> > hth
> >
> >  marco
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> > wrote:
> >>
> >> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
> >> 1.5).
> >>
> >> Admittedly I am messing around with Spark-shell. However, I am surprised
> >> why this does not work with Spark 2 and is ok with CDH 5.1
> >>
> >> scala>     val dstream = KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> >>
> >> java.lang.NoClassDefFoundError: Could not initialize class
> >> kafka.consumer.FetchRequestAndResponseStatsRegistry$
> >>   at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
> >>   at
> >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> >>   at scala.collection.mutable.WrappedArray.foreach(
> WrappedArray.scala:35)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$
> spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(
> KafkaCluster.scala:125)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitions(KafkaCluster.scala:112)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.
> scala:211)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
> >>   ... 74 elided
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn
> >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
> >> loss, damage or destruction of data or any other property which may
> arise
> >> from relying on this email's technical content is explicitly
> disclaimed. The
> >> author will in no case be liable for any monetary damages arising from
> such
> >> loss, damage or destruction.
> >>
> >>
> >
> >
>

Reply via email to