My bad! Confused myself with different build.sbt I tried in different projects Thx Cody for pointing that out(again) Spark streaming Kafka was all I needed Kr
On 6 Feb 2017 9:02 pm, "Cody Koeninger" <c...@koeninger.org> wrote: > You should not need to include jars for Kafka, the spark connectors > have the appropriate transitive dependency on the correct version. > > On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mmistr...@gmail.com> > wrote: > > Hi > > not sure if this will help at all, and pls take it with a pinch of salt > as > > i dont have your setup and i am not running on a cluster > > > > I have tried to run a kafka example which was originally workkign on > spark > > 1.6.1 on spark 2. > > These are the jars i am using > > > > spark-streaming-kafka-0-10_2.11_2.0.1.jar > > > > kafka_2.11-0.10.1.1 > > > > > > And here's the code up to the creation of the Direct Stream. apparently > with > > the new version of kafka libs some properties have to be specified > > > > > > import org.apache.spark.SparkConf > > import org.apache.spark.streaming.{Seconds, StreamingContext} > > import org.apache.spark.storage.StorageLevel > > > > import java.util.regex.Pattern > > import java.util.regex.Matcher > > > > import Utilities._ > > > > import org.apache.spark.streaming.kafka010.KafkaUtils > > import kafka.serializer.StringDecoder > > import > > org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent > > import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe > > > > /** Working example of listening for log data from Kafka's testLogs > topic on > > port 9092. */ > > object KafkaExample { > > > > def main(args: Array[String]) { > > > > // Create the context with a 1 second batch size > > val ssc = new StreamingContext("local[*]", "KafkaExample", > Seconds(1)) > > > > setupLogging() > > > > // Construct a regular expression (regex) to extract fields from raw > > Apache log lines > > val pattern = apacheLogPattern() > > > > val kafkaParams = Map("metadata.broker.list" -> "localhost:9092", > > "bootstrap.servers" -> "localhost:9092", > > "key.deserializer" > > ->"org.apache.kafka.common.serialization.StringDeserializer", > > "value.deserializer" > > ->"org.apache.kafka.common.serialization.StringDeserializer", > > "group.id" -> "group1") > > val topics = List("testLogs").toSet > > val lines = KafkaUtils.createDirectStream[String, String]( > > ssc, > > PreferConsistent, > > Subscribe[String, > > String](topics, kafkaParams) > > ).map(cr => cr.value()) > > > > hth > > > > marco > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh < > mich.talebza...@gmail.com> > > wrote: > >> > >> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark > >> 1.5). > >> > >> Admittedly I am messing around with Spark-shell. However, I am surprised > >> why this does not work with Spark 2 and is ok with CDH 5.1 > >> > >> scala> val dstream = KafkaUtils.createDirectStream[String, String, > >> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) > >> > >> java.lang.NoClassDefFoundError: Could not initialize class > >> kafka.consumer.FetchRequestAndResponseStatsRegistry$ > >> at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39) > >> at > >> org.apache.spark.streaming.kafka.KafkaCluster.connect( > KafkaCluster.scala:52) > >> at > >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$ > org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply( > KafkaCluster.scala:345) > >> at > >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$ > org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply( > KafkaCluster.scala:342) > >> at > >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized. > scala:33) > >> at scala.collection.mutable.WrappedArray.foreach( > WrappedArray.scala:35) > >> at > >> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$ > spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) > >> at > >> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata( > KafkaCluster.scala:125) > >> at > >> org.apache.spark.streaming.kafka.KafkaCluster. > getPartitions(KafkaCluster.scala:112) > >> at > >> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils. > scala:211) > >> at > >> org.apache.spark.streaming.kafka.KafkaUtils$. > createDirectStream(KafkaUtils.scala:484) > >> ... 74 elided > >> > >> > >> Dr Mich Talebzadeh > >> > >> > >> > >> LinkedIn > >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd > OABUrV8Pw > >> > >> > >> > >> http://talebzadehmich.wordpress.com > >> > >> > >> Disclaimer: Use it at your own risk. Any and all responsibility for any > >> loss, damage or destruction of data or any other property which may > arise > >> from relying on this email's technical content is explicitly > disclaimed. The > >> author will in no case be liable for any monetary damages arising from > such > >> loss, damage or destruction. > >> > >> > > > > >