Vlad Badelita created SPARK-21561: ------------------------------------- Summary: spark-streaming-kafka-010 DSteam is not pulling anything from Kafka Key: SPARK-21561 URL: https://issues.apache.org/jira/browse/SPARK-21561 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.1.1 Reporter: Vlad Badelita
I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka topic(broker version 0.10). I have checked that messages are being produced and used a KafkaConsumer to pull them successfully. Now, when I try to use the spark streaming api, I am not getting anything. If I just use KafkaUtils.createRDD and specify some offset ranges manually it works. But when, I try to use createDirectStream, all the rdds are empty and when I check the partition offsets it simply reports that all partitions are 0. Here is what I tried: {code:scala} val sparkConf = new SparkConf().setAppName("kafkastream") val ssc = new StreamingContext(sparkConf, Seconds(3)) val topics = Array("my_topic") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hostname:6667" "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "my_group", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (true: java.lang.Boolean) ) val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } val rddCount = rdd.count() println("rdd count: ", rddCount) // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination() {code} All partitions show offset ranges from 0 to 0 and all rdds are empty. I would like it to start from the beginning of a partition but also pick up everything that is being produced to it. I have also tried using spark-streaming-kafka-0.8 and it does work. I think it is a 0.10 issue because everything else works fine. Thank you! -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org